This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 29574af  [HUDI-2573] Fixing double locking with multi-writers (#3827)
29574af is described below

commit 29574af239ae4596034a17999484ed069ec7123f
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Oct 29 12:14:39 2021 -0400

    [HUDI-2573] Fixing double locking with multi-writers (#3827)
    
    - There are two code paths, where we are taking double locking. this was 
added as part of adding data table locks to update metadata table. Fixing those 
flows to avoid taking locks if a parent transaction already acquired a lock.
---
 .../hudi/client/AbstractHoodieWriteClient.java     |  80 ++++++++++---
 .../java/org/apache/hudi/table/HoodieTable.java    |   7 +-
 .../table/action/clean/CleanActionExecutor.java    |  22 +++-
 .../restore/CopyOnWriteRestoreActionExecutor.java  |   1 +
 .../restore/MergeOnReadRestoreActionExecutor.java  |   1 +
 .../rollback/BaseRollbackActionExecutor.java       |  26 ++--
 .../CopyOnWriteRollbackActionExecutor.java         |  10 +-
 .../MergeOnReadRollbackActionExecutor.java         |  10 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   7 +-
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |   6 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   7 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |   3 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  10 +-
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |   5 +-
 .../functional/TestHoodieBackedMetadata.java       | 133 ++++++++++++++++++++-
 .../java/org/apache/hudi/table/TestCleaner.java    |   2 +-
 .../TestCopyOnWriteRollbackActionExecutor.java     |   6 +-
 .../TestMergeOnReadRollbackActionExecutor.java     |   6 +-
 18 files changed, 281 insertions(+), 61 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index b65060f..3e6b7ab 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -501,7 +501,7 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
       } else {
         // Do not reuse instantTime for clean as metadata table requires all 
changes to have unique instant timestamps.
         LOG.info("Auto cleaning is enabled. Running cleaner now");
-        clean();
+        clean(true);
       }
     }
   }
@@ -570,16 +570,22 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
   }
 
+  @Deprecated
+  public boolean rollback(final String commitInstantTime) throws 
HoodieRollbackException {
+    return rollback(commitInstantTime, false);
+  }
+
   /**
    * @Deprecated
    * Rollback the inflight record changes with the given commit time. This
    * will be removed in future in favor of {@link 
AbstractHoodieWriteClient#restoreToInstant(String)}
    *
    * @param commitInstantTime Instant time of the commit
+   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    * @throws HoodieRollbackException if rollback cannot be performed 
successfully
    */
   @Deprecated
-  public boolean rollback(final String commitInstantTime) throws 
HoodieRollbackException {
+  public boolean rollback(final String commitInstantTime, boolean skipLocking) 
throws HoodieRollbackException {
     LOG.info("Begin rollback of instant " + commitInstantTime);
     final String rollbackInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
@@ -590,10 +596,12 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
           .findFirst());
       if (commitInstantOpt.isPresent()) {
         LOG.info("Scheduling Rollback at instant time :" + 
rollbackInstantTime);
-        Option<HoodieRollbackPlan> rollbackPlanOption = 
table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), 
false);
+        Option<HoodieRollbackPlan> rollbackPlanOption = 
table.scheduleRollback(context, rollbackInstantTime,
+            commitInstantOpt.get(), false);
         if (rollbackPlanOption.isPresent()) {
           // execute rollback
-          HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, commitInstantOpt.get(), true);
+          HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, commitInstantOpt.get(), true,
+              skipLocking);
           if (timerContext != null) {
             long durationInMs = metrics.getDurationInMs(timerContext.stop());
             metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
@@ -644,7 +652,19 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
    * cleaned)
    */
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
-    return clean(cleanInstantTime, true);
+    return clean(cleanInstantTime, true, false);
+  }
+
+  /**
+   * Clean up any stale/old files/data lying around (either on file storage or 
index storage) based on the
+   * configurations and CleaningPolicy used. (typically files that no longer 
can be used by a running query can be
+   * cleaned)
+   * @param cleanInstantTime instant time for clean.
+   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
+   * @return instance of {@link HoodieCleanMetadata}.
+   */
+  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
skipLocking) throws HoodieIOException {
+    return clean(cleanInstantTime, true, skipLocking);
   }
 
   /**
@@ -653,8 +673,11 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
    * cleaned). This API provides the flexibility to schedule clean instant 
asynchronously via
    * {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, 
TableServiceType)} and disable inline scheduling
    * of clean.
+   * @param cleanInstantTime instant time for clean.
+   * @param scheduleInline true if needs to be scheduled inline. false 
otherwise.
+   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    */
-  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline) throws HoodieIOException {
+  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline, boolean skipLocking) throws HoodieIOException {
     if (scheduleInline) {
       scheduleTableServiceInternal(cleanInstantTime, Option.empty(), 
TableServiceType.CLEAN);
     }
@@ -662,8 +685,8 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
     final Timer.Context timerContext = metrics.getCleanCtx();
     LOG.info("Cleaned failed attempts if any");
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
-    HoodieCleanMetadata metadata = createTable(config, 
hadoopConf).clean(context, cleanInstantTime);
+        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+    HoodieCleanMetadata metadata = createTable(config, 
hadoopConf).clean(context, cleanInstantTime, skipLocking);
     if (timerContext != null && metadata != null) {
       long durationMs = metrics.getDurationInMs(timerContext.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -675,7 +698,17 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
   }
 
   public HoodieCleanMetadata clean() {
-    return clean(HoodieActiveTimeline.createNewInstantTime());
+    return clean(false);
+  }
+
+  /**
+   * Triggers clean for the table. This refers to Clean up any stale/old 
files/data lying around (either on file storage or index storage) based on the
+   *    * configurations and CleaningPolicy used.
+   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
+   * @return instance of {@link HoodieCleanMetadata}.
+   */
+  public HoodieCleanMetadata clean(boolean skipLocking) {
+    return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
   }
 
   /**
@@ -797,20 +830,29 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
    * Rollback all failed writes.
    */
   public Boolean rollbackFailedWrites() {
+    return rollbackFailedWrites(false);
+  }
+
+  /**
+   * Rollback all failed writes.
+   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
+   */
+  public Boolean rollbackFailedWrites(boolean skipLocking) {
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    List<String> instantsToRollback = 
getInstantsToRollback(table.getMetaClient(), 
config.getFailedWritesCleanPolicy());
-    rollbackFailedWrites(instantsToRollback);
+    List<String> instantsToRollback = 
getInstantsToRollback(table.getMetaClient(), 
config.getFailedWritesCleanPolicy(),
+        Option.empty());
+    rollbackFailedWrites(instantsToRollback, skipLocking);
     return true;
   }
 
-  protected void rollbackFailedWrites(List<String> instantsToRollback) {
+  protected void rollbackFailedWrites(List<String> instantsToRollback, boolean 
skipLocking) {
     for (String instant : instantsToRollback) {
       if (HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
         rollbackFailedBootstrap();
         break;
       } else {
-        rollback(instant);
+        rollback(instant, skipLocking);
       }
     }
     // Delete any heartbeat files for already rolled back commits
@@ -822,11 +864,17 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
     }
   }
 
-  protected List<String> getInstantsToRollback(HoodieTableMetaClient 
metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy) {
+  protected List<String> getInstantsToRollback(HoodieTableMetaClient 
metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> 
curInstantTime) {
     Stream<HoodieInstant> inflightInstantsStream = 
getInflightTimelineExcludeCompactionAndClustering(metaClient)
         .getReverseOrderedInstants();
     if (cleaningPolicy.isEager()) {
-      return 
inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return 
inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
+        if (curInstantTime.isPresent()) {
+          return !entry.equals(curInstantTime.get());
+        } else {
+          return true;
+        }
+      }).collect(Collectors.toList());
     } else if (cleaningPolicy.isLazy()) {
       return inflightInstantsStream.filter(instant -> {
         try {
@@ -975,7 +1023,7 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
   protected void rollbackInflightClustering(HoodieInstant inflightInstant, 
HoodieTable<T, I, K, O> table) {
     String commitTime = HoodieActiveTimeline.createNewInstantTime();
     table.scheduleRollback(context, commitTime, inflightInstant, false);
-    table.rollback(context, commitTime, inflightInstant, false);
+    table.rollback(context, commitTime, inflightInstant, false, false);
     
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 6de40a7..a6c14e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -424,7 +424,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
    *
    * @return information on cleaned file slices
    */
-  public abstract HoodieCleanMetadata clean(HoodieEngineContext context, 
String cleanInstantTime);
+  public abstract HoodieCleanMetadata clean(HoodieEngineContext context, 
String cleanInstantTime, boolean skipLocking);
 
   /**
    * Schedule rollback for the instant time.
@@ -452,7 +452,8 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
                                                   String rollbackInstantTime,
                                                   HoodieInstant commitInstant,
-                                                  boolean deleteInstants);
+                                                  boolean deleteInstants,
+                                                  boolean skipLocking);
 
   /**
    * Create a savepoint at the specified instant, so that the table can be 
restored
@@ -480,7 +481,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
     String commitTime = HoodieActiveTimeline.createNewInstantTime();
     scheduleRollback(context, commitTime, inflightInstant, false);
-    rollback(context, commitTime, inflightInstant, false);
+    rollback(context, commitTime, inflightInstant, false, false);
     getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 1b229ca..a445fd3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -60,10 +60,16 @@ public class CleanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LogManager.getLogger(CleanActionExecutor.class);
   private final TransactionManager txnManager;
+  private final boolean skipLocking;
 
   public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig 
config, HoodieTable<T, I, K, O> table, String instantTime) {
+    this(context, config, table, instantTime, false);
+  }
+
+  public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig 
config, HoodieTable<T, I, K, O> table, String instantTime, boolean skipLocking) 
{
     super(context, config, table, instantTime);
     this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
+    this.skipLocking = skipLocking;
   }
 
   static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) 
throws IOException {
@@ -214,11 +220,17 @@ public class CleanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends
    * @param cleanMetadata instance of {@link HoodieCleanMetadata} to be 
applied to metadata.
    */
   private void writeMetadata(HoodieCleanMetadata cleanMetadata) {
-    try {
-      this.txnManager.beginTransaction(Option.empty(), Option.empty());
-      writeTableMetadata(cleanMetadata);
-    } finally {
-      this.txnManager.endTransaction();
+    if (config.isMetadataTableEnabled()) {
+      try {
+        if (!skipLocking) {
+          this.txnManager.beginTransaction(Option.empty(), Option.empty());
+        }
+        writeTableMetadata(cleanMetadata);
+      } finally {
+        if (!skipLocking) {
+          this.txnManager.endTransaction();
+        }
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index 2e3b148..1116ef9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -58,6 +58,7 @@ public class CopyOnWriteRestoreActionExecutor<T extends 
HoodieRecordPayload, I,
         instantToRollback,
         true,
         true,
+        false,
         false);
     return rollbackActionExecutor.execute();
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index 58663b6..db6fbc2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -62,6 +62,7 @@ public class MergeOnReadRestoreActionExecutor<T extends 
HoodieRecordPayload, I,
         instantToRollback,
         true,
         true,
+        false,
         false);
 
     // TODO : Get file status and create a rollback stat and file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 5749f2b..ff50a29 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -59,15 +59,17 @@ public abstract class BaseRollbackActionExecutor<T extends 
HoodieRecordPayload,
   protected final boolean skipTimelinePublish;
   protected final boolean useMarkerBasedStrategy;
   private final TransactionManager txnManager;
+  private final boolean skipLocking;
 
   public BaseRollbackActionExecutor(HoodieEngineContext context,
       HoodieWriteConfig config,
       HoodieTable<T, I, K, O> table,
       String instantTime,
       HoodieInstant instantToRollback,
-      boolean deleteInstants) {
+      boolean deleteInstants,
+      boolean skipLocking) {
     this(context, config, table, instantTime, instantToRollback, 
deleteInstants,
-        false, config.shouldRollbackUsingMarkers());
+        false, config.shouldRollbackUsingMarkers(), skipLocking);
   }
 
   public BaseRollbackActionExecutor(HoodieEngineContext context,
@@ -77,7 +79,8 @@ public abstract class BaseRollbackActionExecutor<T extends 
HoodieRecordPayload,
       HoodieInstant instantToRollback,
       boolean deleteInstants,
       boolean skipTimelinePublish,
-      boolean useMarkerBasedStrategy) {
+      boolean useMarkerBasedStrategy,
+      boolean skipLocking) {
     super(context, config, table, instantTime);
     this.instantToRollback = instantToRollback;
     this.deleteInstants = deleteInstants;
@@ -87,6 +90,7 @@ public abstract class BaseRollbackActionExecutor<T extends 
HoodieRecordPayload,
       ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
           "Cannot use marker based rollback strategy on completed instant:" + 
instantToRollback);
     }
+    this.skipLocking = skipLocking;
     this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
   }
 
@@ -265,11 +269,17 @@ public abstract class BaseRollbackActionExecutor<T 
extends HoodieRecordPayload,
    * @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be 
applied to metadata.
    */
   private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) {
-    try {
-      this.txnManager.beginTransaction(Option.empty(), Option.empty());
-      writeTableMetadata(rollbackMetadata);
-    } finally {
-      this.txnManager.endTransaction();
+    if (config.isMetadataTableEnabled()) {
+      try {
+        if (!skipLocking) {
+          this.txnManager.beginTransaction(Option.empty(), Option.empty());
+        }
+        writeTableMetadata(rollbackMetadata);
+      } finally {
+        if (!skipLocking) {
+          this.txnManager.endTransaction();
+        }
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index 9187179..5e11354 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -43,8 +43,9 @@ public class CopyOnWriteRollbackActionExecutor<T extends 
HoodieRecordPayload, I,
                                            HoodieTable<T, I, K, O> table,
                                            String instantTime,
                                            HoodieInstant commitInstant,
-                                           boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
+                                           boolean deleteInstants,
+                                           boolean skipLocking) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking);
   }
 
   public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
@@ -54,8 +55,9 @@ public class CopyOnWriteRollbackActionExecutor<T extends 
HoodieRecordPayload, I,
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
                                            boolean skipTimelinePublish,
-                                           boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, useMarkerBasedStrategy);
+                                           boolean useMarkerBasedStrategy,
+                                           boolean skipLocking) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 23af445..c2b25ff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -43,8 +43,9 @@ public class MergeOnReadRollbackActionExecutor<T extends 
HoodieRecordPayload, I,
                                            HoodieTable<T, I, K, O> table,
                                            String instantTime,
                                            HoodieInstant commitInstant,
-                                           boolean deleteInstants) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants);
+                                           boolean deleteInstants,
+                                           boolean skipLocking) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking);
   }
 
   public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
@@ -54,8 +55,9 @@ public class MergeOnReadRollbackActionExecutor<T extends 
HoodieRecordPayload, I,
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
                                            boolean skipTimelinePublish,
-                                           boolean useMarkerBasedStrategy) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, useMarkerBasedStrategy);
+                                           boolean useMarkerBasedStrategy,
+                                           boolean skipLocking) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index e30f2d4..ae0ced2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -309,13 +309,14 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload>
   }
 
   @Override
-  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
+  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime, boolean skipLocking) {
     return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
   @Override
-  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new CopyOnWriteRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants).execute();
+  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant,
+                                         boolean deleteInstants, boolean 
skipLocking) {
+    return new CopyOnWriteRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index b165c84..56a14da 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -119,8 +119,10 @@ public class HoodieFlinkMergeOnReadTable<T extends 
HoodieRecordPayload>
   }
 
   @Override
-  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new MergeOnReadRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants).execute();
+  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant,
+                                         boolean deleteInstants, boolean 
skipLocking) {
+    return new MergeOnReadRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants,
+        skipLocking).execute();
   }
 }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 99cf413..9d96ca1 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -192,7 +192,7 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context,
-                                   String cleanInstantTime) {
+                                   String cleanInstantTime, boolean 
skipLocking) {
     return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
@@ -200,9 +200,10 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload> extends H
   public HoodieRollbackMetadata rollback(HoodieEngineContext context,
                                          String rollbackInstantTime,
                                          HoodieInstant commitInstant,
-                                         boolean deleteInstants) {
+                                         boolean deleteInstants,
+                                         boolean skipLocking) {
     return new CopyOnWriteRollbackActionExecutor(
-        context, config, this, rollbackInstantTime, commitInstant, 
deleteInstants).execute();
+        context, config, this, rollbackInstantTime, commitInstant, 
deleteInstants, skipLocking).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index a1a5c85..4100b04 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -424,7 +424,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
         this.txnManager.beginTransaction();
         try {
           // Ensure no inflight commits by setting EAGER policy and explicitly 
cleaning all failed commits
-          this.rollbackFailedWrites(getInstantsToRollback(metaClient, 
HoodieFailedWritesCleaningPolicy.EAGER));
+          this.rollbackFailedWrites(getInstantsToRollback(metaClient, 
HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true);
           new UpgradeDowngrade(
               metaClient, config, context, 
SparkUpgradeDowngradeHelper.getInstance())
               .run(HoodieTableVersion.current(), instantTime);
@@ -434,6 +434,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
       } else {
         upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
       }
+      metaClient.reloadActiveTimeline();
     }
     metaClient.validateTableProperties(config.getProps(), operationType);
     return getTableAndInitCtx(metaClient, operationType, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 6f5611f..e458d84 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -256,13 +256,15 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload>
   }
 
   @Override
-  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
-    return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
+  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime, boolean skipLocking) {
+    return new CleanActionExecutor(context, config, this, cleanInstantTime, 
skipLocking).execute();
   }
 
   @Override
-  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
-    return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) 
context, config, this, rollbackInstantTime, commitInstant, 
deleteInstants).execute();
+  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant,
+                                         boolean deleteInstants, boolean 
skipLocking) {
+    return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) 
context, config, this, rollbackInstantTime, commitInstant,
+        deleteInstants, skipLocking).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 30984e0..d0bc969 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -159,8 +159,9 @@ public class HoodieSparkMergeOnReadTable<T extends 
HoodieRecordPayload> extends
   public HoodieRollbackMetadata rollback(HoodieEngineContext context,
                                          String rollbackInstantTime,
                                          HoodieInstant commitInstant,
-                                         boolean deleteInstants) {
-    return new MergeOnReadRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants).execute();
+                                         boolean deleteInstants,
+                                         boolean skipLocking) {
+    return new MergeOnReadRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e0c61e1..de757a0 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -264,7 +264,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   @EnumSource(HoodieTableType.class)
   public void testInsertUpsertCluster(HoodieTableType tableType) throws 
Exception {
     init(tableType);
-    doWriteOperation(testTable,"0000001",  INSERT);
+    doWriteOperation(testTable, "0000001", INSERT);
     doWriteOperation(testTable, "0000002");
     doClusterAndValidate(testTable, "0000003");
     if (tableType == MERGE_ON_READ) {
@@ -639,6 +639,51 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   }
 
   /**
+   * Tests that when inline cleaning is enabled and with auto commit set to 
true, there is no double locking.
+   * bcoz, auto clean is triggered within post commit which is already 
happening within a lock.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultiWriterForDoubleLocking() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
 "3");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "5000");
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build())
+        .withAutoCommit(false)
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+        .withProperties(properties)
+        .build();
+
+    SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, 
writeConfig);
+    String partitionPath = dataGen.getPartitionPaths()[0];
+    for (int j = 0; j < 6; j++) {
+      String newCommitTime = "000000" + j;
+      List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(newCommitTime, 100, partitionPath);
+      writeClient.startCommitWithTime(newCommitTime);
+      JavaRDD writeStatuses = writeClient.insert(jsc.parallelize(records, 1), 
newCommitTime);
+      writeClient.commit(newCommitTime, writeStatuses);
+    }
+
+    // Ensure all commits were synced to the Metadata Table
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    LOG.warn("total commits in metadata table " + 
metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+
+    // 6 commits and 2 cleaner commits.
+    
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
 8);
+    
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()
 <= 1);
+    // Validation
+    validateMetadata(writeClient);
+  }
+
+  /**
    * Lets say clustering commit succeeded in metadata table, but failed before 
committing to datatable.
    * Next time, when clustering kicks in, hudi will rollback pending 
clustering and re-attempt the clustering with same instant time.
    * So, this test ensures the 2nd attempt succeeds with metadata enabled.
@@ -925,6 +970,92 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   }
 
   /**
+   * When table needs to be upgraded and when multi writer is enabled, hudi 
rollsback partial commits. Upgrade itself is happening
+   * within a lock and hence rollback should not lock again.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, 
InterruptedException {
+    init(HoodieTableType.COPY_ON_WRITE, false);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Perform a commit. This should bootstrap the metadata table with latest 
version.
+    List<HoodieRecord> records;
+    JavaRDD<WriteStatus> writeStatuses;
+    String commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
 "3");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "5000");
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+        .withProperties(properties)
+        .build();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      records = dataGen.generateInserts(commitTimestamp, 5);
+      client.startCommitWithTime(commitTimestamp);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), 
commitTimestamp);
+      client.commit(commitTimestamp, writeStatuses);
+    }
+
+    // Metadata table should have been bootstrapped
+    assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should exist");
+    FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath));
+
+    // trigger partial commit
+    metaClient.reloadActiveTimeline();
+    commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      records = dataGen.generateInserts(commitTimestamp, 5);
+      client.startCommitWithTime(commitTimestamp);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), 
commitTimestamp);
+    }
+
+    // set hoodie.table.version to 2 in hoodie.properties file
+    changeTableVersion(HoodieTableVersion.TWO);
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withRollbackUsingMarkers(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
+        .withProperties(properties)
+        .build();
+
+    // With next commit the table should be deleted (as part of upgrade) and 
partial commit should be rolled back.
+    metaClient.reloadActiveTimeline();
+    commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      records = dataGen.generateInserts(commitTimestamp, 5);
+      client.startCommitWithTime(commitTimestamp);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), 
commitTimestamp);
+      assertNoWriteErrors(writeStatuses.collect());
+    }
+    assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not exist");
+
+    // With next commit the table should be re-bootstrapped (currently in the 
constructor. To be changed)
+    commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      records = dataGen.generateInserts(commitTimestamp, 5);
+      client.startCommitWithTime(commitTimestamp);
+      writeStatuses = client.insert(jsc.parallelize(records, 1), 
commitTimestamp);
+      assertNoWriteErrors(writeStatuses.collect());
+    }
+    assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should exist");
+
+    initMetaClient();
+    assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), 
HoodieTableVersion.THREE.versionCode());
+    assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should exist");
+    FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
+    assertTrue(oldStatus.getModificationTime() < 
newStatus.getModificationTime());
+  }
+
+  /**
    * Test various error scenarios.
    */
   //@Test
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 063b556..72f6a07 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -1308,7 +1308,7 @@ public class TestCleaner extends HoodieClientTestBase {
     metaClient.reloadActiveTimeline();
     HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "000");
     table.scheduleRollback(context, "001", rollbackInstant, false);
-    table.rollback(context, "001", rollbackInstant, true);
+    table.rollback(context, "001", rollbackInstant, true, false);
     final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
     assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 2e93602..3225dcd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -88,7 +88,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", needRollBackInstant, false);
     HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
-    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", 
needRollBackInstant, true);
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", 
needRollBackInstant, true,
+        false);
     List<HoodieRollbackStat> hoodieRollbackStats = 
copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan);
 
     // assert hoodieRollbackStats
@@ -169,7 +170,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", commitInstant, false);
     HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
-    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, 
false);
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, 
false,
+        false);
     Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
 
     //3. assert the rollback stat
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 06f70f2..38be873 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -99,7 +99,8 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
         table,
         "003",
         rollBackInstant,
-        true);
+        true,
+        false);
     //3. assert the rollback stat
     Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
     assertEquals(2, rollbackMetadata.size());
@@ -148,7 +149,8 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
           rollBackInstant,
           true,
           true,
-          true).execute();
+          true,
+          false).execute();
     });
   }
 

Reply via email to