This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 47356a57930 [HUDI-5862] Strength MARKER based rollback for log files
with a single commit (#8077)
47356a57930 is described below
commit 47356a57930687c1bdfa66d1a62421d8a5fc0b29
Author: voonhous <[email protected]>
AuthorDate: Wed Mar 1 11:05:20 2023 +0800
[HUDI-5862] Strength MARKER based rollback for log files with a single
commit (#8077)
---
.../rollback/MarkerBasedRollbackStrategy.java | 12 ++++-
.../TestMarkerBasedRollbackStrategy.java | 54 ++++++++++++++++++++--
2 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 000ea21af98..45870501cab 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -93,7 +93,7 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
// - Base file's file-id
// - Base file's commit instant
// - Partition path
- return
getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
+ return getRollbackRequestForAppend(instantToRollback,
WriteMarkers.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during
rollback of " + instantToRollback);
}
@@ -103,7 +103,7 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(String
markerFilePath) throws IOException {
+ protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
Path baseFilePathForAppend = new Path(basePath, markerFilePath);
String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
@@ -115,6 +115,14 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
// TODO(HUDI-1517) use provided marker-file's path instead
Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+
+ // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
+ if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+ Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
+ return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
+ Collections.singletonList(fullDeletePath.toString()),
+ Collections.emptyMap());
+ }
Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
if (latestLogFileOption.isPresent()) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 927f8f3c24b..47e93ab2e94 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -145,7 +145,7 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, writeConfig)) {
// rollback 2nd commit and ensure stats reflect the info.
- List<HoodieRollbackStat> stats = testRun(useFileListingMetadata,
writeConfig, writeClient);
+ List<HoodieRollbackStat> stats =
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
assertEquals(3, stats.size());
for (HoodieRollbackStat stat : stats) {
@@ -172,7 +172,7 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, writeConfig)) {
// rollback 2nd commit and ensure stats reflect the info.
- List<HoodieRollbackStat> stats = testRun(useFileListingMetadata,
writeConfig, writeClient);
+ List<HoodieRollbackStat> stats =
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
assertEquals(3, stats.size());
for (HoodieRollbackStat stat : stats) {
@@ -184,7 +184,55 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
}
}
- private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata,
HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
+ @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+ @MethodSource("configParams")
+ public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean
useFileListingMetadata) throws Exception {
+ // init MERGE_ON_READ_TABLE
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
+
+ HoodieWriteConfig writeConfig =
getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+ .withPath(basePath).build();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, writeConfig)) {
+
+ // rollback 2nd commit and ensure stats reflect the info.
+ List<HoodieRollbackStat> stats = testInsertAndRollback(writeClient);
+
+ assertEquals(3, stats.size());
+ for (HoodieRollbackStat stat : stats) {
+ assertEquals(1, stat.getSuccessDeleteFiles().size());
+ assertEquals(0, stat.getFailedDeleteFiles().size());
+ assertEquals(0, stat.getCommandBlocksCount().size());
+ stat.getCommandBlocksCount().forEach((fileStatus, len) ->
assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+ }
+ }
+ }
+
+ private List<HoodieRollbackStat> testInsertAndRollback(SparkRDDWriteClient
writeClient) {
+ String newCommitTime = "001";
+ writeClient.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+ JavaRDD<WriteStatus> writeStatuses =
writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
+ writeClient.commit(newCommitTime, writeStatuses);
+
+ writeStatuses.collect();
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+ "002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"001"));
+
+ // rollback 1st commit and ensure stats reflect the info.
+ return new BaseRollbackHelper(hoodieTable.getMetaClient(),
getConfig()).performRollback(context,
+ new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, "001"),
+ rollbackRequests);
+ }
+
+ private List<HoodieRollbackStat> testUpdateAndRollback(boolean
useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient
writeClient) {
String newCommitTime = "001";
writeClient.startCommitWithTime(newCommitTime);