This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47356a57930 [HUDI-5862] Strength MARKER based rollback for log files 
with a single commit (#8077)
47356a57930 is described below

commit 47356a57930687c1bdfa66d1a62421d8a5fc0b29
Author: voonhous <[email protected]>
AuthorDate: Wed Mar 1 11:05:20 2023 +0800

    [HUDI-5862] Strength MARKER based rollback for log files with a single 
commit (#8077)
---
 .../rollback/MarkerBasedRollbackStrategy.java      | 12 ++++-
 .../TestMarkerBasedRollbackStrategy.java           | 54 ++++++++++++++++++++--
 2 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 000ea21af98..45870501cab 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -93,7 +93,7 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
             //          - Base file's file-id
             //          - Base file's commit instant
             //          - Partition path
-            return 
getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
+            return getRollbackRequestForAppend(instantToRollback, 
WriteMarkers.stripMarkerSuffix(markerFilePath));
           default:
             throw new HoodieRollbackException("Unknown marker type, during 
rollback of " + instantToRollback);
         }
@@ -103,7 +103,7 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) throws IOException {
+  protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant 
instantToRollback, String markerFilePath) throws IOException {
     Path baseFilePathForAppend = new Path(basePath, markerFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
@@ -115,6 +115,14 @@ public class MarkerBasedRollbackStrategy<T, I, K, O> 
implements BaseRollbackPlan
     // TODO(HUDI-1517) use provided marker-file's path instead
     Option<HoodieLogFile> latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
         HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+
+    // Log file can be deleted if the commit to rollback is also the commit 
that created the fileGroup
+    if (latestLogFileOption.isPresent() && 
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+      Path fullDeletePath = new Path(partitionPath, 
latestLogFileOption.get().getFileName());
+      return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, 
EMPTY_STRING,
+          Collections.singletonList(fullDeletePath.toString()),
+          Collections.emptyMap());
+    }
     
     Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
     if (latestLogFileOption.isPresent()) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 927f8f3c24b..47e93ab2e94 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -145,7 +145,7 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, writeConfig)) {
       // rollback 2nd commit and ensure stats reflect the info.
-      List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, 
writeConfig, writeClient);
+      List<HoodieRollbackStat> stats = 
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
 
       assertEquals(3, stats.size());
       for (HoodieRollbackStat stat : stats) {
@@ -172,7 +172,7 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, writeConfig)) {
 
       // rollback 2nd commit and ensure stats reflect the info.
-      List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, 
writeConfig, writeClient);
+      List<HoodieRollbackStat> stats = 
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
 
       assertEquals(3, stats.size());
       for (HoodieRollbackStat stat : stats) {
@@ -184,7 +184,55 @@ public class TestMarkerBasedRollbackStrategy extends 
HoodieClientTestBase {
     }
   }
 
-  private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, 
HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
+  @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+  @MethodSource("configParams")
+  public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean 
useFileListingMetadata) throws Exception {
+    // init MERGE_ON_READ_TABLE
+    tearDown();
+    tableType = HoodieTableType.MERGE_ON_READ;
+    setUp();
+
+    HoodieWriteConfig writeConfig = 
getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+        .withPath(basePath).build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, writeConfig)) {
+
+      // rollback 2nd commit and ensure stats reflect the info.
+      List<HoodieRollbackStat> stats = testInsertAndRollback(writeClient);
+
+      assertEquals(3, stats.size());
+      for (HoodieRollbackStat stat : stats) {
+        assertEquals(1, stat.getSuccessDeleteFiles().size());
+        assertEquals(0, stat.getFailedDeleteFiles().size());
+        assertEquals(0, stat.getCommandBlocksCount().size());
+        stat.getCommandBlocksCount().forEach((fileStatus, len) -> 
assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+      }
+    }
+  }
+
+  private List<HoodieRollbackStat> testInsertAndRollback(SparkRDDWriteClient 
writeClient) {
+    String newCommitTime = "001";
+    writeClient.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+    JavaRDD<WriteStatus> writeStatuses = 
writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
+    writeClient.commit(newCommitTime, writeStatuses);
+
+    writeStatuses.collect();
+
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, 
metaClient);
+    List<HoodieRollbackRequest> rollbackRequests = new 
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "002").getRollbackRequests(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
"001"));
+
+    // rollback 1st commit and ensure stats reflect the info.
+    return new BaseRollbackHelper(hoodieTable.getMetaClient(), 
getConfig()).performRollback(context,
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001"),
+        rollbackRequests);
+  }
+
+  private List<HoodieRollbackStat> testUpdateAndRollback(boolean 
useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient 
writeClient) {
     String newCommitTime = "001";
     writeClient.startCommitWithTime(newCommitTime);
 

Reply via email to