manojpec commented on a change in pull request #4363:
URL: https://github.com/apache/hudi/pull/4363#discussion_r771834361



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
##########
@@ -35,49 +35,64 @@
 public class TransactionManager implements Serializable {
 
   private static final Logger LOG = 
LogManager.getLogger(TransactionManager.class);
-
   private final LockManager lockManager;
-  private Option<HoodieInstant> currentTxnOwnerInstant;
-  private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
-  private boolean supportsOptimisticConcurrency;
+  private final boolean isOptimisticConcurrencyControlEnabled;
+  private Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
+  private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
     this.lockManager = new LockManager(config, fs);
-    this.supportsOptimisticConcurrency = 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+    this.isOptimisticConcurrencyControlEnabled = 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
   }
 
-  public synchronized void beginTransaction() {
-    if (supportsOptimisticConcurrency) {
+  public void beginTransaction() {
+    if (isOptimisticConcurrencyControlEnabled) {
       LOG.info("Transaction starting without a transaction owner");
       lockManager.lock();
-      LOG.info("Transaction started");
+      LOG.info("Transaction started without a transaction owner");
     }
   }
 
-  public synchronized void beginTransaction(Option<HoodieInstant> 
currentTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
-    if (supportsOptimisticConcurrency) {
-      this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
-      lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant);
-      LOG.info("Latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
-      this.currentTxnOwnerInstant = currentTxnOwnerInstant;
-      LOG.info("Transaction starting with transaction owner " + 
currentTxnOwnerInstant);
+  public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
+                               Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
+    if (isOptimisticConcurrencyControlEnabled) {
+      LOG.info("Transaction starting for " + newTxnOwnerInstant
+          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
       lockManager.lock();
-      LOG.info("Transaction started");
+      reset(currentTxnOwnerInstant, newTxnOwnerInstant, 
lastCompletedTxnOwnerInstant);
+      LOG.info("Transaction started for " + newTxnOwnerInstant
+          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
+    }
+  }
+
+  public void endTransaction() {
+    if (isOptimisticConcurrencyControlEnabled) {
+      LOG.info("Transaction ending without a transaction owner");
+      lockManager.unlock();
+      LOG.info("Transaction ended without a transaction owner");
     }
   }
 
-  public synchronized void endTransaction() {
-    if (supportsOptimisticConcurrency) {
+  public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
+    if (isOptimisticConcurrencyControlEnabled) {
       LOG.info("Transaction ending with transaction owner " + 
currentTxnOwnerInstant);
+      reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
       lockManager.unlock();
-      LOG.info("Transaction ended");
-      this.lastCompletedTxnOwnerInstant = Option.empty();
-      lockManager.resetLatestCompletedWriteInstant();
+      LOG.info("Transaction ended with transaction owner " + 
currentTxnOwnerInstant);
+    }
+  }
+
+  private synchronized void reset(Option<HoodieInstant> callerInstant,
+                                  Option<HoodieInstant> newTxnOwnerInstant,
+                                  Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
+    if (!this.currentTxnOwnerInstant.isPresent() || 
this.currentTxnOwnerInstant == callerInstant) {

Review comment:
       @nsivabalan We need to fix this as well. Usually callers pass in the 
same instant object and so not an issue. but, for callers who are passing in 
different object with the same instant also we need to reset. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to