manojpec commented on a change in pull request #4363:
URL: https://github.com/apache/hudi/pull/4363#discussion_r771839171



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
##########
@@ -35,49 +35,64 @@
 public class TransactionManager implements Serializable {
 
   private static final Logger LOG = 
LogManager.getLogger(TransactionManager.class);
-
   private final LockManager lockManager;
-  private Option<HoodieInstant> currentTxnOwnerInstant;
-  private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
-  private boolean supportsOptimisticConcurrency;
+  private final boolean isOptimisticConcurrencyControlEnabled;
+  private Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
+  private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
     this.lockManager = new LockManager(config, fs);
-    this.supportsOptimisticConcurrency = 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+    this.isOptimisticConcurrencyControlEnabled = 
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
   }
 
-  public synchronized void beginTransaction() {
-    if (supportsOptimisticConcurrency) {
+  public void beginTransaction() {
+    if (isOptimisticConcurrencyControlEnabled) {
       LOG.info("Transaction starting without a transaction owner");
       lockManager.lock();
-      LOG.info("Transaction started");
+      LOG.info("Transaction started without a transaction owner");
     }
   }
 
-  public synchronized void beginTransaction(Option<HoodieInstant> 
currentTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
-    if (supportsOptimisticConcurrency) {
-      this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
-      lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant);
-      LOG.info("Latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
-      this.currentTxnOwnerInstant = currentTxnOwnerInstant;
-      LOG.info("Transaction starting with transaction owner " + 
currentTxnOwnerInstant);
+  public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
+                               Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
+    if (isOptimisticConcurrencyControlEnabled) {
+      LOG.info("Transaction starting for " + newTxnOwnerInstant
+          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
       lockManager.lock();
-      LOG.info("Transaction started");
+      reset(currentTxnOwnerInstant, newTxnOwnerInstant, 
lastCompletedTxnOwnerInstant);
+      LOG.info("Transaction started for " + newTxnOwnerInstant
+          + " with latest completed transaction instant " + 
lastCompletedTxnOwnerInstant);
+    }
+  }
+
+  public void endTransaction() {
+    if (isOptimisticConcurrencyControlEnabled) {
+      LOG.info("Transaction ending without a transaction owner");
+      lockManager.unlock();
+      LOG.info("Transaction ended without a transaction owner");
     }
   }
 
-  public synchronized void endTransaction() {
-    if (supportsOptimisticConcurrency) {
+  public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
+    if (isOptimisticConcurrencyControlEnabled) {
       LOG.info("Transaction ending with transaction owner " + 
currentTxnOwnerInstant);
+      reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
       lockManager.unlock();
-      LOG.info("Transaction ended");
-      this.lastCompletedTxnOwnerInstant = Option.empty();
-      lockManager.resetLatestCompletedWriteInstant();
+      LOG.info("Transaction ended with transaction owner " + 
currentTxnOwnerInstant);
+    }
+  }
+
+  private synchronized void reset(Option<HoodieInstant> callerInstant,
+                                  Option<HoodieInstant> newTxnOwnerInstant,
+                                  Option<HoodieInstant> 
lastCompletedTxnOwnerInstant) {
+    if (!this.currentTxnOwnerInstant.isPresent() || 
this.currentTxnOwnerInstant == callerInstant) {

Review comment:
       Filed https://issues.apache.org/jira/browse/HUDI-3064 to track the CI 
failing test TestHoodieClientMultiWriter flakiness . The test is buggy and 
FileSystemBasedLockProviderTestClass doesn't do the right tryLock. Even with 
the above TransactionManager fix, the test can fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to