This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1afb1bf04a [HUDI-6246] Fixing restore for compaction commit (#8774)
f1afb1bf04a is described below

commit f1afb1bf04abdc94a26d61dc302f36ec2bbeb15b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jul 6 15:47:09 2023 -0400

    [HUDI-6246] Fixing restore for compaction commit (#8774)
    
    - Before this patch, there could be some dangling data files even though 
there won't be any data consistency issues when we are restoring a completed 
compaction. After this patch, there won't be any dangling files.
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   3 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   9 +-
 .../restore/CopyOnWriteRestoreActionExecutor.java  |   2 +-
 .../restore/MergeOnReadRestoreActionExecutor.java  |   2 +-
 .../rollback/BaseRollbackActionExecutor.java       |   7 +-
 .../rollback/BaseRollbackPlanActionExecutor.java   |   7 +-
 .../CopyOnWriteRollbackActionExecutor.java         |   4 +-
 .../rollback/ListingBasedRollbackStrategy.java     |  59 ++++++++----
 .../MergeOnReadRollbackActionExecutor.java         |   4 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   2 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   4 +-
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |   4 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   4 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   5 +-
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |   5 +-
 .../commit/BaseSparkCommitActionExecutor.java      |   2 +-
 .../TestSavepointRestoreCopyOnWrite.java           |   2 +-
 .../TestSavepointRestoreMergeOnRead.java           | 105 ++++++++++++++++++++-
 .../java/org/apache/hudi/table/TestCleaner.java    |   2 +-
 .../TestCopyOnWriteRollbackActionExecutor.java     |  16 ++--
 .../TestMergeOnReadRollbackActionExecutor.java     |   4 +-
 21 files changed, 189 insertions(+), 63 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index eabf5e02c76..5eb2a8f9ff3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -874,7 +874,8 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
                 + "(exists in active timeline: %s), with rollback plan: %s",
             rollbackInstantTime, commitInstantOpt.isPresent(), 
pendingRollbackInfo.isPresent()));
         Option<HoodieRollbackPlan> rollbackPlanOption = 
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
-            .orElseGet(() -> table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers()));
+            .orElseGet(() -> table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(),
+                false));
         if (rollbackPlanOption.isPresent()) {
           // There can be a case where the inflight rollback failed after the 
instant files
           // are deleted for commitInstantTime, so that commitInstantOpt is 
empty as it is
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8e5371532ed..d87444143cb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -531,7 +531,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   public abstract Option<HoodieRollbackPlan> 
scheduleRollback(HoodieEngineContext context,
                                                               String 
instantTime,
                                                               HoodieInstant 
instantToRollback,
-                                                              boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers);
+                                                              boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers,
+                                                              boolean 
isRestore);
 
   /**
    * Rollback the (inflight/committed) record changes with the given commit 
time.
@@ -634,7 +635,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
                                        Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
     final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
         -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers());
+    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
+        false);
     rollback(context, commitTime, inflightInstant, false, false);
     getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
   }
@@ -648,7 +650,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc) {
     final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
         -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers());
+    scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers(),
+        false);
     rollback(context, commitTime, inflightInstant, true, false);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index eea97e1e5ca..b4cbc01e4ae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -47,7 +47,7 @@ public class CopyOnWriteRestoreActionExecutor<T, I, K, O>
     }
     table.getMetaClient().reloadActiveTimeline();
     String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
-    table.scheduleRollback(context, newInstantTime, instantToRollback, false, 
false);
+    table.scheduleRollback(context, newInstantTime, instantToRollback, false, 
false, true);
     table.getMetaClient().reloadActiveTimeline();
     CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(
         context,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index f449a120d57..78dcc349dee 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -51,7 +51,7 @@ public class MergeOnReadRestoreActionExecutor<T, I, K, O>
     }
     table.getMetaClient().reloadActiveTimeline();
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    table.scheduleRollback(context, instantTime, instantToRollback, false, 
false);
+    table.scheduleRollback(context, instantTime, instantToRollback, false, 
false, true);
     table.getMetaClient().reloadActiveTimeline();
     MergeOnReadRollbackActionExecutor rollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
         context,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 04b67cf637d..43e3e814bda 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -71,8 +71,9 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> 
extends BaseActionE
                                     String instantTime,
                                     HoodieInstant instantToRollback,
                                     boolean deleteInstants,
-                                    boolean skipLocking) {
-    this(context, config, table, instantTime, instantToRollback, 
deleteInstants, false, skipLocking);
+                                    boolean skipLocking,
+                                    boolean isRestore) {
+    this(context, config, table, instantTime, instantToRollback, 
deleteInstants, false, skipLocking, isRestore);
   }
 
   public BaseRollbackActionExecutor(HoodieEngineContext context,
@@ -82,7 +83,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> 
extends BaseActionE
       HoodieInstant instantToRollback,
       boolean deleteInstants,
       boolean skipTimelinePublish,
-      boolean skipLocking) {
+      boolean skipLocking, boolean isRestore) {
     super(context, config, table, instantTime);
     this.instantToRollback = instantToRollback;
     this.resolvedInstant = instantToRollback;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 32eec545b72..48af32751a3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -50,6 +50,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
   protected final HoodieInstant instantToRollback;
   private final boolean skipTimelinePublish;
   private final boolean shouldRollbackUsingMarkers;
+  protected final Boolean isRestore;
 
   public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
   public static final Integer LATEST_ROLLBACK_PLAN_VERSION = 
ROLLBACK_PLAN_VERSION_1;
@@ -60,11 +61,13 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
                                         String instantTime,
                                         HoodieInstant instantToRollback,
                                         boolean skipTimelinePublish,
-                                        boolean shouldRollbackUsingMarkers) {
+                                        boolean shouldRollbackUsingMarkers,
+                                        boolean isRestore) {
     super(context, config, table, instantTime);
     this.instantToRollback = instantToRollback;
     this.skipTimelinePublish = skipTimelinePublish;
     this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers && 
!instantToRollback.isCompleted();
+    this.isRestore = isRestore;
   }
 
   /**
@@ -89,7 +92,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> 
extends BaseActionExecut
     if (shouldRollbackUsingMarkers) {
       return new MarkerBasedRollbackStrategy(table, context, config, 
instantTime);
     } else {
-      return new ListingBasedRollbackStrategy(table, context, config, 
instantTime);
+      return new ListingBasedRollbackStrategy(table, context, config, 
instantTime, isRestore);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index c2dd8f4629a..7abb3e7a931 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -44,7 +44,7 @@ public class CopyOnWriteRollbackActionExecutor<T, I, K, O> 
extends BaseRollbackA
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
                                            boolean skipLocking) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking);
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking, false);
   }
 
   public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
@@ -55,7 +55,7 @@ public class CopyOnWriteRollbackActionExecutor<T, I, K, O> 
extends BaseRollbackA
                                            boolean deleteInstants,
                                            boolean skipTimelinePublish,
                                            boolean skipLocking) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, skipLocking);
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, skipLocking, false);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 0997260e912..820e998c368 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -50,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -72,14 +73,18 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
 
   protected final String instantTime;
 
+  protected final Boolean isRestore;
+
   public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
                                       HoodieEngineContext context,
                                       HoodieWriteConfig config,
-                                      String instantTime) {
+                                      String instantTime,
+                                      boolean isRestore) {
     this.table = table;
     this.context = context;
     this.config = config;
     this.instantTime = instantTime;
+    this.isRestore = isRestore;
   }
 
   @Override
@@ -96,42 +101,46 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       String baseFileExtension = getBaseFileExtension(metaClient);
       Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
       Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
+      AtomicBoolean isCompaction = new AtomicBoolean(false);
+      if (commitMetadataOptional.isPresent()) {
+        isCompaction.set(commitMetadataOptional.get().getOperationType() == 
WriteOperationType.COMPACT);
+      }
 
       return context.flatMap(partitionPaths, partitionPath -> {
         List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
         FileStatus[] filesToDelete =
             fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath(), baseFileExtension,
-                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted);
+                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted, tableType);
 
         if (HoodieTableType.COPY_ON_WRITE == tableType) {
           
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, 
filesToDelete));
         } else if (HoodieTableType.MERGE_ON_READ == tableType) {
           String commit = instantToRollback.getTimestamp();
           HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
-          switch (instantToRollback.getAction()) {
+          String action = instantToRollback.getAction();
+          if (isCompaction.get()) { // compaction's action in hoodie instant 
will be "commit". So, we might need to override.
+            action = HoodieTimeline.COMPACTION_ACTION;
+          }
+          switch (action) {
             case HoodieTimeline.COMMIT_ACTION:
             case HoodieTimeline.REPLACE_COMMIT_ACTION:
               
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, 
filesToDelete));
               break;
             case HoodieTimeline.COMPACTION_ACTION:
-              // If there is no delta commit present after the current commit 
(if compaction), no action, else we
-              // need to make sure that a compaction commit rollback also 
deletes any log files written as part of the
-              // succeeding deltacommit.
-              boolean higherDeltaCommits =
-                  
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1)
-                      .empty();
-              if (higherDeltaCommits) {
-                // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
+              // Depending on whether we are rolling back compaction as part 
of restore or a regular rollback, logic differs/
+              // as part of regular rollback(on re-attempting a failed 
compaction), we might have to delete/rollback only the base file that could have
+              // potentially been created. Even if there are log files added 
to the file slice of interest, we should not touch them.
+              // but if its part of a restore operation, rolling back a 
compaction should rollback entire file slice, i.e base file and all log files.
+              if (!isRestore) {
+                // Rollback of a compaction action if not for restore means 
that the compaction is scheduled
                 // and has not yet finished. In this scenario we should delete 
only the newly created base files
                 // and not corresponding base commit log files created with 
this as baseCommit since updates would
                 // have been written to the log files.
                 
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
-                    listFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
+                    listBaseFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
                         metaClient.getFs())));
               } else {
-                // No deltacommits present after this compaction commit 
(inflight or requested). In this case, we
-                // can also delete any log files that were created with this 
compaction commit as base
-                // commit.
+                // if this is part of a restore operation, we should 
rollback/delete entire file slice.
                 
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, 
filesToDelete));
               }
               break;
@@ -205,8 +214,8 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
         .collect(Collectors.toList());
   }
 
-  private FileStatus[] listFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
-                                            FileSystem fs) throws IOException {
+  private FileStatus[] listBaseFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
+                                                FileSystem fs) throws 
IOException {
     LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
     PathFilter filter = (path) -> {
       if (path.toString().contains(basefileExtension)) {
@@ -221,8 +230,10 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
   private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
                                              String baseFileExtension, 
HoodieWrapperFileSystem fs,
                                              Option<HoodieCommitMetadata> 
commitMetadataOptional,
-                                             Boolean 
isCommitMetadataCompleted) throws IOException {
-    if (isCommitMetadataCompleted) {
+                                             Boolean isCommitMetadataCompleted,
+                                             HoodieTableType tableType) throws 
IOException {
+    // go w/ commit metadata only for COW table. for MOR, we need to get 
associated log files when commit corresponding to base file is rolledback.
+    if (isCommitMetadataCompleted && tableType == 
HoodieTableType.COPY_ON_WRITE) {
       return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
           baseFileExtension, fs);
     } else {
@@ -248,6 +259,16 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     }).toArray(Path[]::new), pathFilter);
   }
 
+  /**
+   * returns matching base files and log files if any for the instant time of 
the commit to be rolled back.
+   * @param instantToRollback
+   * @param partitionPath
+   * @param basePath
+   * @param baseFileExtension
+   * @param fs
+   * @return
+   * @throws IOException
+   */
   private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
                                                String baseFileExtension, 
HoodieWrapperFileSystem fs)
       throws IOException {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index c085118d030..73a7f280a01 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -44,7 +44,7 @@ public class MergeOnReadRollbackActionExecutor<T, I, K, O> 
extends BaseRollbackA
                                            HoodieInstant commitInstant,
                                            boolean deleteInstants,
                                            boolean skipLocking) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking);
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipLocking, false);
   }
 
   public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
@@ -55,7 +55,7 @@ public class MergeOnReadRollbackActionExecutor<T, I, K, O> 
extends BaseRollbackA
                                            boolean deleteInstants,
                                            boolean skipTimelinePublish,
                                            boolean skipLocking) {
-    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, skipLocking);
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, skipLocking, false);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 822aef7a82a..9096c4e05cd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -116,7 +116,7 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
 
   List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?> 
table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
     List<HoodieRollbackRequest> hoodieRollbackRequests =
-        new ListingBasedRollbackStrategy(table, context, table.getConfig(), 
commitInstantOpt.get().getTimestamp())
+        new ListingBasedRollbackStrategy(table, context, table.getConfig(), 
commitInstantOpt.get().getTimestamp(), false)
             .getRollbackRequests(commitInstantOpt.get());
     return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
         .collectRollbackStats(context, commitInstantOpt.get(), 
hoodieRollbackRequests);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index e07eff4cf3d..e33ff6ba122 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -334,9 +334,9 @@ public class HoodieFlinkCopyOnWriteTable<T>
 
   @Override
   public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext 
context, String instantTime, HoodieInstant instantToRollback,
-                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
     return new BaseRollbackPlanActionExecutor(context, config, this, 
instantTime, instantToRollback, skipTimelinePublish,
-        shouldRollbackUsingMarkers).execute();
+        shouldRollbackUsingMarkers, isRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index 499a68d2942..361ea7a6e6f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -146,9 +146,9 @@ public class HoodieFlinkMergeOnReadTable<T>
 
   @Override
   public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext 
context, String instantTime, HoodieInstant instantToRollback,
-                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
     return new BaseRollbackPlanActionExecutor(context, config, this, 
instantTime, instantToRollback, skipTimelinePublish,
-        shouldRollbackUsingMarkers).execute();
+        shouldRollbackUsingMarkers, isRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 5431bfd0c4f..344a7302c7c 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -204,9 +204,9 @@ public class HoodieJavaCopyOnWriteTable<T>
 
   @Override
   public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext 
context, String instantTime, HoodieInstant instantToRollback,
-                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+                                                     boolean 
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
     return new BaseRollbackPlanActionExecutor(context, config, this, 
instantTime, instantToRollback, skipTimelinePublish,
-        shouldRollbackUsingMarkers).execute();
+        shouldRollbackUsingMarkers, isRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index b8be53eeaf9..1d265e5b91f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -203,9 +203,10 @@ public class HoodieSparkCopyOnWriteTable<T>
   @Override
   public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext 
context,
                                                      String instantTime,
-                                                     HoodieInstant 
instantToRollback, boolean skipTimelinePublish, boolean 
shouldRollbackUsingMarkers) {
+                                                     HoodieInstant 
instantToRollback, boolean skipTimelinePublish, boolean 
shouldRollbackUsingMarkers,
+                                                     boolean isRestore) {
     return new BaseRollbackPlanActionExecutor<>(context, config, this, 
instantTime, instantToRollback, skipTimelinePublish,
-        shouldRollbackUsingMarkers).execute();
+        shouldRollbackUsingMarkers, isRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 32ded270406..f40072566aa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -169,9 +169,10 @@ public class HoodieSparkMergeOnReadTable<T> extends 
HoodieSparkCopyOnWriteTable<
   @Override
   public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext 
context,
                                                      String instantTime,
-                                                     HoodieInstant 
instantToRollback, boolean skipTimelinePublish, boolean 
shouldRollbackUsingMarkers) {
+                                                     HoodieInstant 
instantToRollback, boolean skipTimelinePublish, boolean 
shouldRollbackUsingMarkers,
+                                                     boolean isRestore) {
     return new BaseRollbackPlanActionExecutor<>(context, config, this, 
instantTime, instantToRollback, skipTimelinePublish,
-        shouldRollbackUsingMarkers).execute();
+        shouldRollbackUsingMarkers, isRestore).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index d19dd069ee2..7383f428e0a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -147,7 +147,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
           .collect(Collectors.toSet());
       pendingClusteringInstantsToRollback.forEach(instant -> {
         String commitTime = HoodieActiveTimeline.createNewInstantTime();
-        table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers());
+        table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers(), false);
         table.rollback(context, commitTime, instant, true, true);
       });
       table.getMetaClient().reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 8a71a01fda9..683b0fc939d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -158,7 +158,7 @@ public class TestSavepointRestoreCopyOnWrite extends 
HoodieClientTestBase {
         HoodieInstant pendingInstant = 
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
             .lastInstant().orElseThrow(() -> new HoodieException("Pending 
instant does not exist"));
         HoodieSparkTable.create(client.getConfig(), context)
-            .scheduleRollback(context, 
HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true);
+            .scheduleRollback(context, 
HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true, 
false);
       }
       Option<String> rollbackInstant = 
metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp);
       assertTrue(rollbackInstant.isPresent(), "The latest instant should be a 
rollback");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
index 6c1dfe5d734..5f13f5d1102 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -29,6 +30,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
@@ -38,6 +40,7 @@ import java.util.List;
 import java.util.Objects;
 
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -53,14 +56,14 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
    *
    * <p>For example file layout,
    * FG1:
-   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
-   *   BF5(C5), LF1(DC6), LF2(DC7)
+   * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
+   * BF5(C5), LF1(DC6), LF2(DC7)
    * After restore, it becomes
-   *   BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
+   * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
    *
    * <p>Expected behaviors:
-   *   snapshot query: total rec matches.
-   *   checking the row count by updating columns in (val4,val5,val6, val7).
+   * snapshot query: total rec matches.
+   * checking the row count by updating columns in (val4,val5,val6, val7).
    */
   @Test
   void testCleaningDeltaCommits() throws Exception {
@@ -93,6 +96,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
       assertRowNumberEqualsTo(30);
 
       // write another 3 delta commits
+      String compactionCommit = null;
       for (int i = 1; i <= 3; i++) {
         String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
         client.startCommitWithTime(newCommitTime);
@@ -102,6 +106,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
         if (i == 1) {
           Option<String> compactionInstant = 
client.scheduleCompaction(Option.empty());
           assertTrue(compactionInstant.isPresent(), "A compaction plan should 
be scheduled");
+          compactionCommit = compactionInstant.get();
           client.compact(compactionInstant.get());
         }
       }
@@ -109,6 +114,96 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
       // restore
       client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
       assertRowNumberEqualsTo(30);
+      // ensure there are no data files matching the compaction commit that 
was rolled back.
+      String finalCompactionCommit = compactionCommit;
+      PathFilter filter = (path) -> 
path.toString().contains(finalCompactionCommit);
+      for (String pPath : dataGen.getPartitionPaths()) {
+        assertEquals(0, 
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath), 
filter).length);
+      }
+    }
+  }
+
+  @Test
+  public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws 
IOException {
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4)
+            .withInlineCompaction(true)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+    final int numRecords = 100;
+    String firstCommit;
+    String secondCommit;
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      // 1st commit insert
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
numRecords);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.startCommitWithTime(newCommitTime);
+      client.insert(writeRecords, newCommitTime);
+      firstCommit = newCommitTime;
+
+      // 2nd commit with inserts and updates which will create new file slice 
due to small file handling.
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records2 = 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+      List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+      JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+      client.startCommitWithTime(newCommitTime);
+      client.upsert(writeRecords2.union(writeRecords3), newCommitTime);
+      secondCommit = newCommitTime;
+      // add savepoint to 2nd commit
+      client.savepoint(firstCommit, "test user","test comment");
+    }
+    assertRowNumberEqualsTo(130);
+    // verify there are new base files created matching the 2nd commit 
timestamp.
+    PathFilter filter = (path) -> path.toString().contains(secondCommit);
+    for (String pPath : dataGen.getPartitionPaths()) {
+      assertEquals(1, 
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath), 
filter).length);
+    }
+
+    // disable small file handling so that updates go to log files.
+    hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit 
triggers compaction
+            .withInlineCompaction(true)
+            .compactionSmallFileSize(0)
+            .build())
+        .withRollbackUsingMarkers(true)
+        .build();
+
+    // add 2 more updates which will create log files.
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = 
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      client.startCommitWithTime(newCommitTime);
+      client.upsert(writeRecords, newCommitTime);
+
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      records = dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+      writeRecords = jsc.parallelize(records, 1);
+      client.startCommitWithTime(newCommitTime);
+      client.upsert(writeRecords, newCommitTime);
+    }
+    assertRowNumberEqualsTo(130);
+
+    // restore to 2nd commit.
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      client.restoreToSavepoint(firstCommit);
+    }
+    assertRowNumberEqualsTo(100);
+    // verify that entire file slice created w/ base instant time of 2nd 
commit is completely rolledback.
+    filter = (path) -> path.toString().contains(secondCommit);
+    for (String pPath : dataGen.getPartitionPaths()) {
+      assertEquals(0, 
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath), 
filter).length);
+    }
+    // ensure files matching 1st commit is intact
+    filter = (path) -> path.toString().contains(firstCommit);
+    for (String pPath : dataGen.getPartitionPaths()) {
+      assertEquals(1, 
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath), 
filter).length);
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 17a12dcc7ff..d8e7e20ba7f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -828,7 +828,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, 
"001"), Option.empty());
     metaClient.reloadActiveTimeline();
     HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, "001");
-    table.scheduleRollback(context, "002", rollbackInstant, false, 
config.shouldRollbackUsingMarkers());
+    table.scheduleRollback(context, "002", rollbackInstant, false, 
config.shouldRollbackUsingMarkers(), false);
     table.rollback(context, "002", rollbackInstant, true, false);
     final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
     assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 18aadaa8876..37266950c04 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -109,7 +109,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
rollbackInstant, needRollBackInstant, false,
-            table.getConfig().shouldRollbackUsingMarkers());
+            table.getConfig().shouldRollbackUsingMarkers(), false);
     HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", 
needRollBackInstant, true,
         false);
@@ -184,7 +184,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     HoodieInstant needRollBackInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "002");
     String rollbackInstant = "003";
 
-    ListingBasedRollbackStrategy rollbackStrategy = new 
ListingBasedRollbackStrategy(table, context, table.getConfig(), 
rollbackInstant);
+    ListingBasedRollbackStrategy rollbackStrategy = new 
ListingBasedRollbackStrategy(table, context, table.getConfig(), 
rollbackInstant, false);
     List<HoodieRollbackRequest> rollBackRequests = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
     rollBackRequests.forEach(entry -> System.out.println(" " + 
entry.getPartitionPath() + ", " + entry.getFileId() + " "
         + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
@@ -199,7 +199,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     Mockito.when(fs.exists(any()))
         .thenThrow(new IOException("Failing exists call for " + 
rollbackRequest.getFilesToBeDeleted().get(0)));
 
-    rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, 
rollbackInstant);
+    rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, 
rollbackInstant, false);
     List<HoodieRollbackRequest> rollBackRequestsUpdated = 
rollbackStrategy.getRollbackRequests(needRollBackInstant);
     rollBackRequestsUpdated.forEach(entry -> System.out.println(" " + 
entry.getPartitionPath() + ", " + entry.getFileId() + " "
         + Arrays.toString(entry.getFilesToBeDeleted().toArray())));
@@ -266,7 +266,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     // Schedule rollback
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", needRollBackInstant, false,
-            table.getConfig().shouldRollbackUsingMarkers());
+            table.getConfig().shouldRollbackUsingMarkers(), false);
     HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
@@ -291,7 +291,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
 
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", commitInstant, false,
-            table.getConfig().shouldRollbackUsingMarkers());
+            table.getConfig().shouldRollbackUsingMarkers(), false);
     HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, 
false,
         false);
@@ -357,7 +357,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     // Create the rollback plan and perform the rollback
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", needRollBackInstant, false,
-            table.getConfig().shouldRollbackUsingMarkers());
+            table.getConfig().shouldRollbackUsingMarkers(), false);
     copyOnWriteRollbackPlanActionExecutor.execute();
 
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003",
@@ -447,7 +447,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     String rollbackInstant = HoodieActiveTimeline.createNewInstantTime();
     BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
rollbackInstant, needRollBackInstant, false,
-            !table.getConfig().shouldRollbackUsingMarkers());
+            !table.getConfig().shouldRollbackUsingMarkers(), false);
     copyOnWriteRollbackPlanActionExecutor.execute().get();
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
@@ -460,7 +460,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
     rollbackInstant = HoodieActiveTimeline.createNewInstantTime();
     copyOnWriteRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
rollbackInstant, needRollBackInstant, false,
-            table.getConfig().shouldRollbackUsingMarkers());
+            table.getConfig().shouldRollbackUsingMarkers(), false);
     HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) 
copyOnWriteRollbackPlanActionExecutor.execute().get();
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 24c77c3c054..b3d10db6e52 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -107,7 +107,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
     HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, 
HoodieTimeline.DELTA_COMMIT_ACTION, "002");
     BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
-            cfg.shouldRollbackUsingMarkers());
+            cfg.shouldRollbackUsingMarkers(), false);
     mergeOnReadRollbackPlanActionExecutor.execute().get();
     MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
         context,
@@ -253,7 +253,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
     HoodieInstant rollBackInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
"002");
     BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
         new BaseRollbackPlanActionExecutor(context, cfg, table, "003", 
rollBackInstant, false,
-            cfg.shouldRollbackUsingMarkers());
+            cfg.shouldRollbackUsingMarkers(), false);
     mergeOnReadRollbackPlanActionExecutor.execute().get();
     MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
         context,

Reply via email to