This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f1afb1bf04a [HUDI-6246] Fixing restore for compaction commit (#8774)
f1afb1bf04a is described below
commit f1afb1bf04abdc94a26d61dc302f36ec2bbeb15b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jul 6 15:47:09 2023 -0400
[HUDI-6246] Fixing restore for compaction commit (#8774)
- Before this patch, there could be some dangling data files even though
there won't be any data consistency issues when we are restoring a completed
compaction. After this patch, there won't be any dangling files.
---
.../hudi/client/BaseHoodieTableServiceClient.java | 3 +-
.../java/org/apache/hudi/table/HoodieTable.java | 9 +-
.../restore/CopyOnWriteRestoreActionExecutor.java | 2 +-
.../restore/MergeOnReadRestoreActionExecutor.java | 2 +-
.../rollback/BaseRollbackActionExecutor.java | 7 +-
.../rollback/BaseRollbackPlanActionExecutor.java | 7 +-
.../CopyOnWriteRollbackActionExecutor.java | 4 +-
.../rollback/ListingBasedRollbackStrategy.java | 59 ++++++++----
.../MergeOnReadRollbackActionExecutor.java | 4 +-
.../table/upgrade/ZeroToOneUpgradeHandler.java | 2 +-
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieFlinkMergeOnReadTable.java | 4 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 5 +-
.../hudi/table/HoodieSparkMergeOnReadTable.java | 5 +-
.../commit/BaseSparkCommitActionExecutor.java | 2 +-
.../TestSavepointRestoreCopyOnWrite.java | 2 +-
.../TestSavepointRestoreMergeOnRead.java | 105 ++++++++++++++++++++-
.../java/org/apache/hudi/table/TestCleaner.java | 2 +-
.../TestCopyOnWriteRollbackActionExecutor.java | 16 ++--
.../TestMergeOnReadRollbackActionExecutor.java | 4 +-
21 files changed, 189 insertions(+), 63 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index eabf5e02c76..5eb2a8f9ff3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -874,7 +874,8 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
+ "(exists in active timeline: %s), with rollback plan: %s",
rollbackInstantTime, commitInstantOpt.isPresent(),
pendingRollbackInfo.isPresent()));
Option<HoodieRollbackPlan> rollbackPlanOption =
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
- .orElseGet(() -> table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers()));
+ .orElseGet(() -> table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(),
+ false));
if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the
instant files
// are deleted for commitInstantTime, so that commitInstantOpt is
empty as it is
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8e5371532ed..d87444143cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -531,7 +531,8 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
public abstract Option<HoodieRollbackPlan>
scheduleRollback(HoodieEngineContext context,
String
instantTime,
HoodieInstant
instantToRollback,
- boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers);
+ boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers,
+ boolean
isRestore);
/**
* Rollback the (inflight/committed) record changes with the given commit
time.
@@ -634,7 +635,8 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
->
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
- scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers());
+ scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
+ false);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
@@ -648,7 +650,8 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInstantFunc) {
final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
->
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
- scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers());
+ scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
+ false);
rollback(context, commitTime, inflightInstant, true, false);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index eea97e1e5ca..b4cbc01e4ae 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -47,7 +47,7 @@ public class CopyOnWriteRestoreActionExecutor<T, I, K, O>
}
table.getMetaClient().reloadActiveTimeline();
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
- table.scheduleRollback(context, newInstantTime, instantToRollback, false,
false);
+ table.scheduleRollback(context, newInstantTime, instantToRollback, false,
false, true);
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(
context,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index f449a120d57..78dcc349dee 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -51,7 +51,7 @@ public class MergeOnReadRestoreActionExecutor<T, I, K, O>
}
table.getMetaClient().reloadActiveTimeline();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
- table.scheduleRollback(context, instantTime, instantToRollback, false,
false);
+ table.scheduleRollback(context, instantTime, instantToRollback, false,
false, true);
table.getMetaClient().reloadActiveTimeline();
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
context,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 04b67cf637d..43e3e814bda 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -71,8 +71,9 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O>
extends BaseActionE
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants,
- boolean skipLocking) {
- this(context, config, table, instantTime, instantToRollback,
deleteInstants, false, skipLocking);
+ boolean skipLocking,
+ boolean isRestore) {
+ this(context, config, table, instantTime, instantToRollback,
deleteInstants, false, skipLocking, isRestore);
}
public BaseRollbackActionExecutor(HoodieEngineContext context,
@@ -82,7 +83,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O>
extends BaseActionE
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish,
- boolean skipLocking) {
+ boolean skipLocking, boolean isRestore) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.resolvedInstant = instantToRollback;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 32eec545b72..48af32751a3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -50,6 +50,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
protected final HoodieInstant instantToRollback;
private final boolean skipTimelinePublish;
private final boolean shouldRollbackUsingMarkers;
+ protected final Boolean isRestore;
public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
public static final Integer LATEST_ROLLBACK_PLAN_VERSION =
ROLLBACK_PLAN_VERSION_1;
@@ -60,11 +61,13 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish,
- boolean shouldRollbackUsingMarkers) {
+ boolean shouldRollbackUsingMarkers,
+ boolean isRestore) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.skipTimelinePublish = skipTimelinePublish;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers &&
!instantToRollback.isCompleted();
+ this.isRestore = isRestore;
}
/**
@@ -89,7 +92,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
if (shouldRollbackUsingMarkers) {
return new MarkerBasedRollbackStrategy(table, context, config,
instantTime);
} else {
- return new ListingBasedRollbackStrategy(table, context, config,
instantTime);
+ return new ListingBasedRollbackStrategy(table, context, config,
instantTime, isRestore);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index c2dd8f4629a..7abb3e7a931 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -44,7 +44,7 @@ public class CopyOnWriteRollbackActionExecutor<T, I, K, O>
extends BaseRollbackA
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipLocking) {
- super(context, config, table, instantTime, commitInstant, deleteInstants,
skipLocking);
+ super(context, config, table, instantTime, commitInstant, deleteInstants,
skipLocking, false);
}
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
@@ -55,7 +55,7 @@ public class CopyOnWriteRollbackActionExecutor<T, I, K, O>
extends BaseRollbackA
boolean deleteInstants,
boolean skipTimelinePublish,
boolean skipLocking) {
- super(context, config, table, instantTime, commitInstant, deleteInstants,
skipTimelinePublish, skipLocking);
+ super(context, config, table, instantTime, commitInstant, deleteInstants,
skipTimelinePublish, skipLocking, false);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 0997260e912..820e998c368 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -50,6 +50,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -72,14 +73,18 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
protected final String instantTime;
+ protected final Boolean isRestore;
+
public ListingBasedRollbackStrategy(HoodieTable<?, ?, ?, ?> table,
HoodieEngineContext context,
HoodieWriteConfig config,
- String instantTime) {
+ String instantTime,
+ boolean isRestore) {
this.table = table;
this.context = context;
this.config = config;
this.instantTime = instantTime;
+ this.isRestore = isRestore;
}
@Override
@@ -96,42 +101,46 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
String baseFileExtension = getBaseFileExtension(metaClient);
Option<HoodieCommitMetadata> commitMetadataOptional =
getHoodieCommitMetadata(metaClient, instantToRollback);
Boolean isCommitMetadataCompleted =
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
+ AtomicBoolean isCompaction = new AtomicBoolean(false);
+ if (commitMetadataOptional.isPresent()) {
+ isCompaction.set(commitMetadataOptional.get().getOperationType() ==
WriteOperationType.COMPACT);
+ }
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
FileStatus[] filesToDelete =
fetchFilesFromInstant(instantToRollback, partitionPath,
metaClient.getBasePath(), baseFileExtension,
- metaClient.getFs(), commitMetadataOptional,
isCommitMetadataCompleted);
+ metaClient.getFs(), commitMetadataOptional,
isCommitMetadataCompleted, tableType);
if (HoodieTableType.COPY_ON_WRITE == tableType) {
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
filesToDelete));
} else if (HoodieTableType.MERGE_ON_READ == tableType) {
String commit = instantToRollback.getTimestamp();
HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
- switch (instantToRollback.getAction()) {
+ String action = instantToRollback.getAction();
+ if (isCompaction.get()) { // compaction's action in hoodie instant
will be "commit". So, we might need to override.
+ action = HoodieTimeline.COMPACTION_ACTION;
+ }
+ switch (action) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
filesToDelete));
break;
case HoodieTimeline.COMPACTION_ACTION:
- // If there is no delta commit present after the current commit
(if compaction), no action, else we
- // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
- // succeeding deltacommit.
- boolean higherDeltaCommits =
-
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
- .empty();
- if (higherDeltaCommits) {
- // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // Depending on whether we are rolling back compaction as part
of restore or a regular rollback, logic differs/
+ // as part of regular rollback(on re-attempting a failed
compaction), we might have to delete/rollback only the base file that could have
+ // potentially been created. Even if there are log files added
to the file slice of interest, we should not touch them.
+ // but if its part of a restore operation, rolling back a
compaction should rollback entire file slice, i.e base file and all log files.
+ if (!isRestore) {
+ // Rollback of a compaction action if not for restore means
that the compaction is scheduled
// and has not yet finished. In this scenario we should delete
only the newly created base files
// and not corresponding base commit log files created with
this as baseCommit since updates would
// have been written to the log files.
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
- listFilesToBeDeleted(instantToRollback.getTimestamp(),
baseFileExtension, partitionPath,
+ listBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
baseFileExtension, partitionPath,
metaClient.getFs())));
} else {
- // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
- // can also delete any log files that were created with this
compaction commit as base
- // commit.
+ // if this is part of a restore operation, we should
rollback/delete entire file slice.
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath,
filesToDelete));
}
break;
@@ -205,8 +214,8 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
.collect(Collectors.toList());
}
- private FileStatus[] listFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
- FileSystem fs) throws IOException {
+ private FileStatus[] listBaseFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
+ FileSystem fs) throws
IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " +
partitionPath + " and commit " + commit);
PathFilter filter = (path) -> {
if (path.toString().contains(basefileExtension)) {
@@ -221,8 +230,10 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback,
String partitionPath, String basePath,
String baseFileExtension,
HoodieWrapperFileSystem fs,
Option<HoodieCommitMetadata>
commitMetadataOptional,
- Boolean
isCommitMetadataCompleted) throws IOException {
- if (isCommitMetadataCompleted) {
+ Boolean isCommitMetadataCompleted,
+ HoodieTableType tableType) throws
IOException {
+ // go w/ commit metadata only for COW table. for MOR, we need to get
associated log files when commit corresponding to base file is rolledback.
+ if (isCommitMetadataCompleted && tableType ==
HoodieTableType.COPY_ON_WRITE) {
return fetchFilesFromCommitMetadata(instantToRollback, partitionPath,
basePath, commitMetadataOptional.get(),
baseFileExtension, fs);
} else {
@@ -248,6 +259,16 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
}).toArray(Path[]::new), pathFilter);
}
+ /**
+ * returns matching base files and log files if any for the instant time of
the commit to be rolled back.
+ * @param instantToRollback
+ * @param partitionPath
+ * @param basePath
+ * @param baseFileExtension
+ * @param fs
+ * @return
+ * @throws IOException
+ */
private FileStatus[] fetchFilesFromListFiles(HoodieInstant
instantToRollback, String partitionPath, String basePath,
String baseFileExtension,
HoodieWrapperFileSystem fs)
throws IOException {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index c085118d030..73a7f280a01 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -44,7 +44,7 @@ public class MergeOnReadRollbackActionExecutor<T, I, K, O>
extends BaseRollbackA
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipLocking) {
- super(context, config, table, instantTime, commitInstant, deleteInstants,
skipLocking);
+ super(context, config, table, instantTime, commitInstant, deleteInstants,
skipLocking, false);
}
public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
@@ -55,7 +55,7 @@ public class MergeOnReadRollbackActionExecutor<T, I, K, O>
extends BaseRollbackA
boolean deleteInstants,
boolean skipTimelinePublish,
boolean skipLocking) {
- super(context, config, table, instantTime, commitInstant, deleteInstants,
skipTimelinePublish, skipLocking);
+ super(context, config, table, instantTime, commitInstant, deleteInstants,
skipTimelinePublish, skipLocking, false);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 822aef7a82a..9096c4e05cd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -116,7 +116,7 @@ public class ZeroToOneUpgradeHandler implements
UpgradeHandler {
List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable<?, ?, ?, ?>
table, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
List<HoodieRollbackRequest> hoodieRollbackRequests =
- new ListingBasedRollbackStrategy(table, context, table.getConfig(),
commitInstantOpt.get().getTimestamp())
+ new ListingBasedRollbackStrategy(table, context, table.getConfig(),
commitInstantOpt.get().getTimestamp(), false)
.getRollbackRequests(commitInstantOpt.get());
return new BaseRollbackHelper(table.getMetaClient(), table.getConfig())
.collectRollbackStats(context, commitInstantOpt.get(),
hoodieRollbackRequests);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index e07eff4cf3d..e33ff6ba122 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -334,9 +334,9 @@ public class HoodieFlinkCopyOnWriteTable<T>
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context, String instantTime, HoodieInstant instantToRollback,
- boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+ boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
return new BaseRollbackPlanActionExecutor(context, config, this,
instantTime, instantToRollback, skipTimelinePublish,
- shouldRollbackUsingMarkers).execute();
+ shouldRollbackUsingMarkers, isRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index 499a68d2942..361ea7a6e6f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -146,9 +146,9 @@ public class HoodieFlinkMergeOnReadTable<T>
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context, String instantTime, HoodieInstant instantToRollback,
- boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+ boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
return new BaseRollbackPlanActionExecutor(context, config, this,
instantTime, instantToRollback, skipTimelinePublish,
- shouldRollbackUsingMarkers).execute();
+ shouldRollbackUsingMarkers, isRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 5431bfd0c4f..344a7302c7c 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -204,9 +204,9 @@ public class HoodieJavaCopyOnWriteTable<T>
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context, String instantTime, HoodieInstant instantToRollback,
- boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
+ boolean
skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
return new BaseRollbackPlanActionExecutor(context, config, this,
instantTime, instantToRollback, skipTimelinePublish,
- shouldRollbackUsingMarkers).execute();
+ shouldRollbackUsingMarkers, isRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index b8be53eeaf9..1d265e5b91f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -203,9 +203,10 @@ public class HoodieSparkCopyOnWriteTable<T>
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context,
String instantTime,
- HoodieInstant
instantToRollback, boolean skipTimelinePublish, boolean
shouldRollbackUsingMarkers) {
+ HoodieInstant
instantToRollback, boolean skipTimelinePublish, boolean
shouldRollbackUsingMarkers,
+ boolean isRestore) {
return new BaseRollbackPlanActionExecutor<>(context, config, this,
instantTime, instantToRollback, skipTimelinePublish,
- shouldRollbackUsingMarkers).execute();
+ shouldRollbackUsingMarkers, isRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 32ded270406..f40072566aa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -169,9 +169,10 @@ public class HoodieSparkMergeOnReadTable<T> extends
HoodieSparkCopyOnWriteTable<
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context,
String instantTime,
- HoodieInstant
instantToRollback, boolean skipTimelinePublish, boolean
shouldRollbackUsingMarkers) {
+ HoodieInstant
instantToRollback, boolean skipTimelinePublish, boolean
shouldRollbackUsingMarkers,
+ boolean isRestore) {
return new BaseRollbackPlanActionExecutor<>(context, config, this,
instantTime, instantToRollback, skipTimelinePublish,
- shouldRollbackUsingMarkers).execute();
+ shouldRollbackUsingMarkers, isRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index d19dd069ee2..7383f428e0a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -147,7 +147,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
- table.scheduleRollback(context, commitTime, instant, false,
config.shouldRollbackUsingMarkers());
+ table.scheduleRollback(context, commitTime, instant, false,
config.shouldRollbackUsingMarkers(), false);
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 8a71a01fda9..683b0fc939d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -158,7 +158,7 @@ public class TestSavepointRestoreCopyOnWrite extends
HoodieClientTestBase {
HoodieInstant pendingInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
.lastInstant().orElseThrow(() -> new HoodieException("Pending
instant does not exist"));
HoodieSparkTable.create(client.getConfig(), context)
- .scheduleRollback(context,
HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true);
+ .scheduleRollback(context,
HoodieActiveTimeline.createNewInstantTime(), pendingInstant, false, true,
false);
}
Option<String> rollbackInstant =
metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::getTimestamp);
assertTrue(rollbackInstant.isPresent(), "The latest instant should be a
rollback");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
index 6c1dfe5d734..5f13f5d1102 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -29,6 +30,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -38,6 +40,7 @@ import java.util.List;
import java.util.Objects;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -53,14 +56,14 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
*
* <p>For example file layout,
* FG1:
- * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
- * BF5(C5), LF1(DC6), LF2(DC7)
+ * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4)
+ * BF5(C5), LF1(DC6), LF2(DC7)
* After restore, it becomes
- * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
+ * BF1(DC1), LF1(DC2), LF2(DC3), LF3(DC4), LF4(RB DC4)
*
* <p>Expected behaviors:
- * snapshot query: total rec matches.
- * checking the row count by updating columns in (val4,val5,val6, val7).
+ * snapshot query: total rec matches.
+ * checking the row count by updating columns in (val4,val5,val6, val7).
*/
@Test
void testCleaningDeltaCommits() throws Exception {
@@ -93,6 +96,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
assertRowNumberEqualsTo(30);
// write another 3 delta commits
+ String compactionCommit = null;
for (int i = 1; i <= 3; i++) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
@@ -102,6 +106,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
if (i == 1) {
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should
be scheduled");
+ compactionCommit = compactionInstant.get();
client.compact(compactionInstant.get());
}
}
@@ -109,6 +114,96 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
assertRowNumberEqualsTo(30);
+ // ensure there are no data files matching the compaction commit that
was rolled back.
+ String finalCompactionCommit = compactionCommit;
+ PathFilter filter = (path) ->
path.toString().contains(finalCompactionCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+ }
+ }
+
+ @Test
+ public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws
IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ final int numRecords = 100;
+ String firstCommit;
+ String secondCommit;
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // 1st commit insert
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.insert(writeRecords, newCommitTime);
+ firstCommit = newCommitTime;
+
+ // 2nd commit with inserts and updates which will create new file slice
due to small file handling.
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records2 =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords2.union(writeRecords3), newCommitTime);
+ secondCommit = newCommitTime;
+ // add savepoint to 2nd commit
+ client.savepoint(firstCommit, "test user","test comment");
+ }
+ assertRowNumberEqualsTo(130);
+ // verify there are new base files created matching the 2nd commit
timestamp.
+ PathFilter filter = (path) -> path.toString().contains(secondCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(1,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+
+ // disable small file handling so that updates go to log files.
+ hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
+ .withInlineCompaction(true)
+ .compactionSmallFileSize(0)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+
+ // add 2 more updates which will create log files.
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords, newCommitTime);
+
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ records = dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords, newCommitTime);
+ }
+ assertRowNumberEqualsTo(130);
+
+ // restore to 2nd commit.
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ client.restoreToSavepoint(firstCommit);
+ }
+ assertRowNumberEqualsTo(100);
+ // verify that entire file slice created w/ base instant time of 2nd
commit is completely rolledback.
+ filter = (path) -> path.toString().contains(secondCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+ // ensure files matching 1st commit is intact
+ filter = (path) -> path.toString().contains(firstCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(1,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 17a12dcc7ff..d8e7e20ba7f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -828,7 +828,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION,
"001"), Option.empty());
metaClient.reloadActiveTimeline();
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, "001");
- table.scheduleRollback(context, "002", rollbackInstant, false,
config.shouldRollbackUsingMarkers());
+ table.scheduleRollback(context, "002", rollbackInstant, false,
config.shouldRollbackUsingMarkers(), false);
table.rollback(context, "002", rollbackInstant, true, false);
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 18aadaa8876..37266950c04 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -109,7 +109,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
rollbackInstant, needRollBackInstant, false,
- table.getConfig().shouldRollbackUsingMarkers());
+ table.getConfig().shouldRollbackUsingMarkers(), false);
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan)
copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003",
needRollBackInstant, true,
false);
@@ -184,7 +184,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
HoodieInstant needRollBackInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "002");
String rollbackInstant = "003";
- ListingBasedRollbackStrategy rollbackStrategy = new
ListingBasedRollbackStrategy(table, context, table.getConfig(),
rollbackInstant);
+ ListingBasedRollbackStrategy rollbackStrategy = new
ListingBasedRollbackStrategy(table, context, table.getConfig(),
rollbackInstant, false);
List<HoodieRollbackRequest> rollBackRequests =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
rollBackRequests.forEach(entry -> System.out.println(" " +
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+ Arrays.toString(entry.getFilesToBeDeleted().toArray())));
@@ -199,7 +199,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
Mockito.when(fs.exists(any()))
.thenThrow(new IOException("Failing exists call for " +
rollbackRequest.getFilesToBeDeleted().get(0)));
- rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg,
rollbackInstant);
+ rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg,
rollbackInstant, false);
List<HoodieRollbackRequest> rollBackRequestsUpdated =
rollbackStrategy.getRollbackRequests(needRollBackInstant);
rollBackRequestsUpdated.forEach(entry -> System.out.println(" " +
entry.getPartitionPath() + ", " + entry.getFileId() + " "
+ Arrays.toString(entry.getFilesToBeDeleted().toArray())));
@@ -266,7 +266,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
// Schedule rollback
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
"003", needRollBackInstant, false,
- table.getConfig().shouldRollbackUsingMarkers());
+ table.getConfig().shouldRollbackUsingMarkers(), false);
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan)
copyOnWriteRollbackPlanActionExecutor.execute().get();
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
@@ -291,7 +291,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
"003", commitInstant, false,
- table.getConfig().shouldRollbackUsingMarkers());
+ table.getConfig().shouldRollbackUsingMarkers(), false);
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan)
copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant,
false,
false);
@@ -357,7 +357,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
// Create the rollback plan and perform the rollback
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
"003", needRollBackInstant, false,
- table.getConfig().shouldRollbackUsingMarkers());
+ table.getConfig().shouldRollbackUsingMarkers(), false);
copyOnWriteRollbackPlanActionExecutor.execute();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003",
@@ -447,7 +447,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
String rollbackInstant = HoodieActiveTimeline.createNewInstantTime();
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
rollbackInstant, needRollBackInstant, false,
- !table.getConfig().shouldRollbackUsingMarkers());
+ !table.getConfig().shouldRollbackUsingMarkers(), false);
copyOnWriteRollbackPlanActionExecutor.execute().get();
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
@@ -460,7 +460,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
rollbackInstant = HoodieActiveTimeline.createNewInstantTime();
copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
rollbackInstant, needRollBackInstant, false,
- table.getConfig().shouldRollbackUsingMarkers());
+ table.getConfig().shouldRollbackUsingMarkers(), false);
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan)
copyOnWriteRollbackPlanActionExecutor.execute().get();
// execute CopyOnWriteRollbackActionExecutor with filelisting mode
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 24c77c3c054..b3d10db6e52 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -107,7 +107,7 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers,
HoodieTimeline.DELTA_COMMIT_ACTION, "002");
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
- cfg.shouldRollbackUsingMarkers());
+ cfg.shouldRollbackUsingMarkers(), false);
mergeOnReadRollbackPlanActionExecutor.execute().get();
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
context,
@@ -253,7 +253,7 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
HoodieInstant rollBackInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"002");
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, cfg, table, "003",
rollBackInstant, false,
- cfg.shouldRollbackUsingMarkers());
+ cfg.shouldRollbackUsingMarkers(), false);
mergeOnReadRollbackPlanActionExecutor.execute().get();
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
context,