This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 73a21092 [HUDI-3732] Fixing rollback validation (#5157)
73a21092 is described below
commit 73a21092f8f708d6c90ff2ab95b677b58237be9a
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Mar 31 04:55:24 2022 -0700
[HUDI-3732] Fixing rollback validation (#5157)
* Fixing rollback validation
* Adding tests
---
.../action/restore/BaseRestoreActionExecutor.java | 3 ++
.../rollback/BaseRollbackActionExecutor.java | 7 ++++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 35 ++++++++++++++++++++++
.../table/timeline/HoodieActiveTimeline.java | 5 ++++
4 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 9025623..1fac279 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -136,6 +136,9 @@ public abstract class BaseRestoreActionExecutor<T extends
HoodieRecordPayload, I
.filter(instant ->
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
restoreInstantTime))
.collect(Collectors.toList());
instantsToRollback.forEach(entry -> {
+ if (entry.isCompleted()) {
+ table.getActiveTimeline().deleteCompletedRollback(entry);
+ }
table.getActiveTimeline().deletePending(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION,
entry.getTimestamp()));
table.getActiveTimeline().deletePending(new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
entry.getTimestamp()));
});
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index d44ba55..8e34f0f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -186,7 +186,12 @@ public abstract class BaseRollbackActionExecutor<T extends
HoodieRecordPayload,
}
}
- List<String> inflights =
inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
+ List<String> inflights =
inflightAndRequestedCommitTimeline.getInstants().filter(instant -> {
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION))
{
+ return true;
+ }
+ return
!ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
+ }).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() -
1)) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 3b78954..a6a3703 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1399,6 +1399,41 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
+ @Test
+ public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline()
throws Exception {
+ HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
+
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .withPreserveHoodieCommitMetadata(true).build();
+ // trigger clustering, but do not complete
+ testInsertAndClustering(clusteringConfig, true, false, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
+
+ // trigger another partial commit, followed by valid commit. rollback of
partial commit should succeed.
+ HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder().withAutoCommit(false);
+ SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
+ String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+ client.startCommitWithTime(commitTime1);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
+ JavaRDD<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1);
+ List<WriteStatus> statusList = statuses.collect();
+ assertNoWriteErrors(statusList);
+
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+ assertEquals(2,
metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
+
+ // trigger another commit. this should rollback latest partial commit.
+ records1 = dataGen.generateInserts(commitTime1, 200);
+ client.startCommitWithTime(commitTime1);
+ insertRecordsRDD1 = jsc.parallelize(records1, 2);
+ statuses = client.upsert(insertRecordsRDD1, commitTime1);
+ statusList = statuses.collect();
+ assertNoWriteErrors(statusList);
+ client.commit(commitTime1, statuses);
+ metaClient.reloadActiveTimeline();
+ // rollback should have succeeded. Essentially, the pending clustering
should not hinder the rollback of regular commits.
+ assertEquals(1,
metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInlineScheduleClustering(boolean scheduleInlineClustering)
throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 36dd536..f7dc2f6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -201,6 +201,11 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
deleteInstantFile(instant);
}
+ public void deleteCompletedRollback(HoodieInstant instant) {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ deleteInstantFile(instant);
+ }
+
public static void deleteInstantFile(FileSystem fs, String metaPath,
HoodieInstant instant) {
try {
fs.delete(new Path(metaPath, instant.getFileName()), false);