This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 103b8222887 [HUDI-7988] ListingBasedRollbackStrategy support log 
compact (#11631)
103b8222887 is described below

commit 103b82228874079443d7bf2d3bd5b604fdd606e2
Author: KnightChess <[email protected]>
AuthorDate: Wed Jul 17 15:41:46 2024 +0800

    [HUDI-7988] ListingBasedRollbackStrategy support log compact (#11631)
---
 .../rollback/ListingBasedRollbackStrategy.java     |  8 ++
 .../TestMergeOnReadRollbackActionExecutor.java     | 96 +++++++++++++++++++++-
 .../table/timeline/MetadataConversionUtils.java    |  4 -
 3 files changed, 103 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 5f9b3dcfb22..af3a88643a2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -103,6 +103,10 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       if (commitMetadataOptional.isPresent()) {
         isCompaction.set(commitMetadataOptional.get().getOperationType() == 
WriteOperationType.COMPACT);
       }
+      AtomicBoolean isLogCompaction = new AtomicBoolean(false);
+      if (commitMetadataOptional.isPresent()) {
+        isLogCompaction.set(commitMetadataOptional.get().getOperationType() == 
WriteOperationType.LOG_COMPACT);
+      }
 
       return context.flatMap(partitionPaths, partitionPath -> {
         List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
@@ -125,6 +129,9 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
           if (isCompaction.get()) { // compaction's action in hoodie instant 
will be "commit". So, we might need to override.
             action = HoodieTimeline.COMPACTION_ACTION;
           }
+          if (isLogCompaction.get()) {
+            action = HoodieTimeline.LOG_COMPACTION_ACTION;
+          }
           switch (action) {
             case HoodieTimeline.COMMIT_ACTION:
             case HoodieTimeline.REPLACE_COMMIT_ACTION:
@@ -152,6 +159,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
               }
               break;
             case HoodieTimeline.DELTA_COMMIT_ACTION:
+            case HoodieTimeline.LOG_COMPACTION_ACTION:
 
               // In case all data was inserts and the commit failed, delete 
the file belonging to that commit
               // We do not know fileIds for inserts (first inserts are either 
log files or base files),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 4113dddda93..da43f5b1578 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.FileSlice;
@@ -36,12 +38,14 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.Assertions;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
@@ -152,6 +156,96 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"002").doesMarkerDirExist());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void 
testMergeOnReadRollbackLogCompactActionExecutorWithListingStrategy(boolean 
isComplete) throws IOException {
+    //1. prepare data and assert data result
+    List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+    List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withRollbackUsingMarkers(false).withAutoCommit(false)
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder().enable(true).build())
+        .build();
+    twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, 
secondPartitionCommit2FileSlices, cfg, true);
+    List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
+    List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
+    
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
 -> firstPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, firstPartitionCommit2LogFiles.size());
+    
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
 -> secondPartitionCommit2LogFiles.add(logFile));
+    assertEquals(1, secondPartitionCommit2LogFiles.size());
+
+    //2. log compact
+    cfg = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withLogCompactionBlocksThreshold(1)
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_COMMITS).build())
+        .withRollbackUsingMarkers(false).withAutoCommit(false)
+        .withMetadataConfig(
+            HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    String action = HoodieTimeline.LOG_COMPACTION_ACTION;
+    if (isComplete) {
+      cfg.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+      action = HoodieTimeline.DELTA_COMMIT_ACTION;
+    }
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    client.scheduleLogCompactionAtInstant("003", Option.empty());
+    client.logCompact("003");
+
+    //3. rollback log compact
+    metaClient.reloadActiveTimeline();
+    HoodieInstant rollBackInstant = new HoodieInstant(!isComplete, action, 
"003");
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "004", 
rollBackInstant, false,
+            cfg.shouldRollbackUsingMarkers(), false);
+    mergeOnReadRollbackPlanActionExecutor.execute().get();
+    MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new 
MergeOnReadRollbackActionExecutor(
+        context,
+        cfg,
+        table,
+        "004",
+        rollBackInstant,
+        true,
+        false);
+    //4. assert the rollback stat
+    final HoodieRollbackMetadata execute = 
mergeOnReadRollbackActionExecutor.execute();
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
execute.getPartitionMetadata();
+    assertEquals(2, rollbackMetadata.size());
+
+    for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : 
rollbackMetadata.entrySet()) {
+      HoodieRollbackPartitionMetadata meta = entry.getValue();
+      assertEquals(0, meta.getFailedDeleteFiles().size());
+      assertEquals(1, meta.getSuccessDeleteFiles().size());
+    }
+
+    //4. assert file group after rollback, and compare to the rollbackstat
+    // assert the first partition data and log file size
+    metaClient.reloadActiveTimeline();
+    table = this.getHoodieTable(metaClient, cfg);
+    List<HoodieFileGroup> firstPartitionRollBack1FileGroups = 
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileGroups.size());
+    List<FileSlice> firstPartitionRollBack1FileSlices = 
firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBack1FileSlices.size());
+    FileSlice firstPartitionRollBack1FileSlice = 
firstPartitionRollBack1FileSlices.get(0);
+    List<HoodieLogFile> firstPartitionRollBackLogFiles = 
firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+    assertEquals(1, firstPartitionRollBackLogFiles.size());
+
+    // assert the second partition data and log file size
+    List<HoodieFileGroup> secondPartitionRollBack1FileGroups = 
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileGroups.size());
+    List<FileSlice> secondPartitionRollBack1FileSlices = 
secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBack1FileSlices.size());
+    FileSlice secondPartitionRollBack1FileSlice = 
secondPartitionRollBack1FileSlices.get(0);
+    List<HoodieLogFile> secondPartitionRollBackLogFiles = 
secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+    assertEquals(1, secondPartitionRollBackLogFiles.size());
+
+    assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
"003").doesMarkerDirExist());
+  }
+
   @Test
   public void testMergeOnReadRestoreCompactionCommit() throws IOException {
     boolean isUsingMarkers = false;
@@ -159,7 +253,7 @@ public class TestMergeOnReadRollbackActionExecutor extends 
HoodieClientRollbackT
 
     // 1. ingest data to partition 3.
     //just generate two partitions
-    HoodieTestDataGenerator dataGenPartition3 = new 
HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH});
+    HoodieTestDataGenerator dataGenPartition3 = new 
HoodieTestDataGenerator(new String[] {DEFAULT_THIRD_PARTITION_PATH});
     HoodieTestDataGenerator.writePartitionMetadataDeprecated(storage,
         new String[] {DEFAULT_THIRD_PARTITION_PATH}, basePath);
     SparkRDDWriteClient client = getHoodieWriteClient(cfg);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index 7bd4068dab3..ed741acf365 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -29,7 +29,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -262,9 +261,6 @@ public class MetadataConversionUtils {
     }
     hoodieCommitMetadata.getPartitionToWriteStats().remove(null);
     org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData = 
JsonUtils.getObjectMapper().convertValue(hoodieCommitMetadata, 
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
-    if (hoodieCommitMetadata.getCompacted()) {
-      avroMetaData.setOperationType(WriteOperationType.COMPACT.name());
-    }
     return (T) avroMetaData;
   }
 

Reply via email to