This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 103b8222887 [HUDI-7988] ListingBasedRollbackStrategy support log
compact (#11631)
103b8222887 is described below
commit 103b82228874079443d7bf2d3bd5b604fdd606e2
Author: KnightChess <[email protected]>
AuthorDate: Wed Jul 17 15:41:46 2024 +0800
[HUDI-7988] ListingBasedRollbackStrategy support log compact (#11631)
---
.../rollback/ListingBasedRollbackStrategy.java | 8 ++
.../TestMergeOnReadRollbackActionExecutor.java | 96 +++++++++++++++++++++-
.../table/timeline/MetadataConversionUtils.java | 4 -
3 files changed, 103 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 5f9b3dcfb22..af3a88643a2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -103,6 +103,10 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
if (commitMetadataOptional.isPresent()) {
isCompaction.set(commitMetadataOptional.get().getOperationType() ==
WriteOperationType.COMPACT);
}
+ AtomicBoolean isLogCompaction = new AtomicBoolean(false);
+ if (commitMetadataOptional.isPresent()) {
+ isLogCompaction.set(commitMetadataOptional.get().getOperationType() ==
WriteOperationType.LOG_COMPACT);
+ }
return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
@@ -125,6 +129,9 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
if (isCompaction.get()) { // compaction's action in hoodie instant
will be "commit". So, we might need to override.
action = HoodieTimeline.COMPACTION_ACTION;
}
+ if (isLogCompaction.get()) {
+ action = HoodieTimeline.LOG_COMPACTION_ACTION;
+ }
switch (action) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
@@ -152,6 +159,7 @@ public class ListingBasedRollbackStrategy implements
BaseRollbackPlanActionExecu
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case HoodieTimeline.LOG_COMPACTION_ACTION:
// In case all data was inserts and the commit failed, delete
the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either
log files or base files),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 4113dddda93..da43f5b1578 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -18,9 +18,11 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.FileSlice;
@@ -36,12 +38,14 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
@@ -152,6 +156,96 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testMergeOnReadRollbackLogCompactActionExecutorWithListingStrategy(boolean
isComplete) throws IOException {
+ //1. prepare data and assert data result
+ List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
+ List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withRollbackUsingMarkers(false).withAutoCommit(false)
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder().enable(true).build())
+ .build();
+ twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices,
secondPartitionCommit2FileSlices, cfg, true);
+ List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
+ List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
+
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
-> firstPartitionCommit2LogFiles.add(logFile));
+ assertEquals(1, firstPartitionCommit2LogFiles.size());
+
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile
-> secondPartitionCommit2LogFiles.add(logFile));
+ assertEquals(1, secondPartitionCommit2LogFiles.size());
+
+ //2. log compact
+ cfg = getConfigBuilder()
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withLogCompactionBlocksThreshold(1)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_COMMITS).build())
+ .withRollbackUsingMarkers(false).withAutoCommit(false)
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ String action = HoodieTimeline.LOG_COMPACTION_ACTION;
+ if (isComplete) {
+ cfg.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ action = HoodieTimeline.DELTA_COMMIT_ACTION;
+ }
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ client.scheduleLogCompactionAtInstant("003", Option.empty());
+ client.logCompact("003");
+
+ //3. rollback log compact
+ metaClient.reloadActiveTimeline();
+ HoodieInstant rollBackInstant = new HoodieInstant(!isComplete, action,
"003");
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+ new BaseRollbackPlanActionExecutor(context, cfg, table, "004",
rollBackInstant, false,
+ cfg.shouldRollbackUsingMarkers(), false);
+ mergeOnReadRollbackPlanActionExecutor.execute().get();
+ MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new
MergeOnReadRollbackActionExecutor(
+ context,
+ cfg,
+ table,
+ "004",
+ rollBackInstant,
+ true,
+ false);
+ //4. assert the rollback stat
+ final HoodieRollbackMetadata execute =
mergeOnReadRollbackActionExecutor.execute();
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
execute.getPartitionMetadata();
+ assertEquals(2, rollbackMetadata.size());
+
+ for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry :
rollbackMetadata.entrySet()) {
+ HoodieRollbackPartitionMetadata meta = entry.getValue();
+ assertEquals(0, meta.getFailedDeleteFiles().size());
+ assertEquals(1, meta.getSuccessDeleteFiles().size());
+ }
+
+ //4. assert file group after rollback, and compare to the rollbackstat
+ // assert the first partition data and log file size
+ metaClient.reloadActiveTimeline();
+ table = this.getHoodieTable(metaClient, cfg);
+ List<HoodieFileGroup> firstPartitionRollBack1FileGroups =
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, firstPartitionRollBack1FileGroups.size());
+ List<FileSlice> firstPartitionRollBack1FileSlices =
firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+ assertEquals(1, firstPartitionRollBack1FileSlices.size());
+ FileSlice firstPartitionRollBack1FileSlice =
firstPartitionRollBack1FileSlices.get(0);
+ List<HoodieLogFile> firstPartitionRollBackLogFiles =
firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+ assertEquals(1, firstPartitionRollBackLogFiles.size());
+
+ // assert the second partition data and log file size
+ List<HoodieFileGroup> secondPartitionRollBack1FileGroups =
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
+ assertEquals(1, secondPartitionRollBack1FileGroups.size());
+ List<FileSlice> secondPartitionRollBack1FileSlices =
secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
+ assertEquals(1, secondPartitionRollBack1FileSlices.size());
+ FileSlice secondPartitionRollBack1FileSlice =
secondPartitionRollBack1FileSlices.get(0);
+ List<HoodieLogFile> secondPartitionRollBackLogFiles =
secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
+ assertEquals(1, secondPartitionRollBackLogFiles.size());
+
+ assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"003").doesMarkerDirExist());
+ }
+
@Test
public void testMergeOnReadRestoreCompactionCommit() throws IOException {
boolean isUsingMarkers = false;
@@ -159,7 +253,7 @@ public class TestMergeOnReadRollbackActionExecutor extends
HoodieClientRollbackT
// 1. ingest data to partition 3.
//just generate two partitions
- HoodieTestDataGenerator dataGenPartition3 = new
HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH});
+ HoodieTestDataGenerator dataGenPartition3 = new
HoodieTestDataGenerator(new String[] {DEFAULT_THIRD_PARTITION_PATH});
HoodieTestDataGenerator.writePartitionMetadataDeprecated(storage,
new String[] {DEFAULT_THIRD_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index 7bd4068dab3..ed741acf365 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -29,7 +29,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
@@ -262,9 +261,6 @@ public class MetadataConversionUtils {
}
hoodieCommitMetadata.getPartitionToWriteStats().remove(null);
org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData =
JsonUtils.getObjectMapper().convertValue(hoodieCommitMetadata,
org.apache.hudi.avro.model.HoodieCommitMetadata.class);
- if (hoodieCommitMetadata.getCompacted()) {
- avroMetaData.setOperationType(WriteOperationType.COMPACT.name());
- }
return (T) avroMetaData;
}