This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a21092 [HUDI-3732] Fixing rollback validation (#5157)
73a21092 is described below

commit 73a21092f8f708d6c90ff2ab95b677b58237be9a
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Mar 31 04:55:24 2022 -0700

    [HUDI-3732] Fixing rollback validation (#5157)
    
    * Fixing rollback validation
    
    * Adding tests
---
 .../action/restore/BaseRestoreActionExecutor.java  |  3 ++
 .../rollback/BaseRollbackActionExecutor.java       |  7 ++++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 35 ++++++++++++++++++++++
 .../table/timeline/HoodieActiveTimeline.java       |  5 ++++
 4 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 9025623..1fac279 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -136,6 +136,9 @@ public abstract class BaseRestoreActionExecutor<T extends 
HoodieRecordPayload, I
         .filter(instant -> 
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), 
restoreInstantTime))
         .collect(Collectors.toList());
     instantsToRollback.forEach(entry -> {
+      if (entry.isCompleted()) {
+        table.getActiveTimeline().deleteCompletedRollback(entry);
+      }
       table.getActiveTimeline().deletePending(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, 
entry.getTimestamp()));
       table.getActiveTimeline().deletePending(new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, 
entry.getTimestamp()));
     });
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index d44ba55..8e34f0f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -186,7 +186,12 @@ public abstract class BaseRollbackActionExecutor<T extends 
HoodieRecordPayload,
         }
       }
 
-      List<String> inflights = 
inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
+      List<String> inflights = 
inflightAndRequestedCommitTimeline.getInstants().filter(instant -> {
+        if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) 
{
+          return true;
+        }
+        return 
!ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
+      }).map(HoodieInstant::getTimestamp)
           .collect(Collectors.toList());
       if ((instantTimeToRollback != null) && !inflights.isEmpty()
           && (inflights.indexOf(instantTimeToRollback) != inflights.size() - 
1)) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 3b78954..a6a3703 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1399,6 +1399,41 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
 
+  @Test
+  public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() 
throws Exception {
+    HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
+        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .withPreserveHoodieCommitMetadata(true).build();
+    // trigger clustering, but do not complete
+    testInsertAndClustering(clusteringConfig, true, false, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
+
+    // trigger another partial commit, followed by valid commit. rollback of 
partial commit should succeed.
+    HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder().withAutoCommit(false);
+    SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());
+    String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
+    List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+    client.startCommitWithTime(commitTime1);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
+    JavaRDD<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1);
+    List<WriteStatus> statusList = statuses.collect();
+    assertNoWriteErrors(statusList);
+
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+    assertEquals(2, 
metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
+
+    // trigger another commit. this should rollback latest partial commit.
+    records1 = dataGen.generateInserts(commitTime1, 200);
+    client.startCommitWithTime(commitTime1);
+    insertRecordsRDD1 = jsc.parallelize(records1, 2);
+    statuses = client.upsert(insertRecordsRDD1, commitTime1);
+    statusList = statuses.collect();
+    assertNoWriteErrors(statusList);
+    client.commit(commitTime1, statuses);
+    metaClient.reloadActiveTimeline();
+    // rollback should have succeeded. Essentially, the pending clustering 
should not hinder the rollback of regular commits.
+    assertEquals(1, 
metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants());
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testInlineScheduleClustering(boolean scheduleInlineClustering) 
throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 36dd536..f7dc2f6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -201,6 +201,11 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  public void deleteCompletedRollback(HoodieInstant instant) {
+    ValidationUtils.checkArgument(instant.isCompleted());
+    deleteInstantFile(instant);
+  }
+
   public static void deleteInstantFile(FileSystem fs, String metaPath, 
HoodieInstant instant) {
     try {
       fs.delete(new Path(metaPath, instant.getFileName()), false);

Reply via email to