This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 07164406c44 [HUDI-6423] Incremental cleaning should consider inflight
compaction instant (#9038)
07164406c44 is described below
commit 07164406c44b4092eee810710a242d092c97bd58
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Wed Jul 5 11:05:57 2023 +0800
[HUDI-6423] Incremental cleaning should consider inflight compaction
instant (#9038)
* The CleanPlanner#getEarliestCommitToRetain should consider pending
compaction instants. If the pending compaction got missed under incremental
cleaning mode, some files may never be cleaned when the cleaner moved to a
different partition:
-------- par1 ---- | ----- par2 ->
dc.1 compaction.2 dc.3 | dc.4
Assumes we have 3 delta commits and 1 pending compaction commit on the
timeline, if the `EarliestCommitToRetain ` was recorded to dc.3, when the
dc4(or subsequent instants) triggers cleaning, the cleaner just checks the
timeline with dc.3, and the compaction.2 got skipped for ever if no subsequent
mutations were made to partition par1.
---------
Co-authored-by: Danny Chan <[email protected]>
---
.../action/clean/CleanPlanActionExecutor.java | 1 +
.../hudi/table/action/clean/CleanPlanner.java | 2 +-
.../java/org/apache/hudi/table/TestCleaner.java | 183 ++++++++++++++++-----
.../table/timeline/HoodieDefaultTimeline.java | 7 +
4 files changed, 148 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index ba7c71b1356..b494df42b49 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -111,6 +111,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
LOG.info("Nothing to clean here. It is already clean");
return
HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
+ LOG.info("Earliest commit to retain for clean : " +
(earliestInstant.isPresent() ? earliestInstant.get().getTimestamp() : "null"));
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ",
with policy " + config.getCleanerPolicy());
int cleanerParallelism = Math.min(partitionsToClean.size(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index be949fedb37..80aa7b31624 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -509,7 +509,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
return CleanerUtils.getEarliestCommitToRetain(
- hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(),
+
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
config.getCleanerPolicy(),
config.getCleanerCommitsRetained(),
Instant.now(),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index d1e77613691..17a12dcc7ff 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -260,6 +261,97 @@ public class TestCleaner extends HoodieCleanerTestBase {
}
}
+ /**
+ * Test earliest commit to retain should be earlier than first pending
compaction in incremental cleaning scenarios.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEarliestInstantToRetainForPendingCompaction() throws
IOException {
+ HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath)
+ .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+ .withEnableBackupForRemoteFileSystemView(false)
+ .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(1)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .compactionSmallFileSize(1024 * 1024 * 1024)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false)
+ .archiveCommitsWith(2,3)
+ .build())
+ .withEmbeddedTimelineServerEnabled(false).build();
+
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(context,
writeConfig)) {
+
+ final String partition1 = "2023/06/01";
+ final String partition2 = "2023/06/02";
+ String instantTime = "";
+ String earliestInstantToRetain = "";
+
+ for (int idx = 0; idx < 3; ++idx) {
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ if (idx == 2) {
+ earliestInstantToRetain = instantTime;
+ }
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(instantTime, 1, partition1);
+ client.startCommitWithTime(instantTime);
+ client.insert(jsc.parallelize(records, 1), instantTime).collect();
+ }
+
+
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context);
+ Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context,
instantTime, Option.empty());
+
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(),
1);
+ assertEquals(earliestInstantToRetain,
cleanPlan.get().getEarliestInstantToRetain().getTimestamp(),
+ "clean until " + earliestInstantToRetain);
+ table.getMetaClient().reloadActiveTimeline();
+ table.clean(context, instantTime);
+
+
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(instantTime, 1, partition1);
+ client.startCommitWithTime(instantTime);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+ client.insert(recordsRDD, instantTime).collect();
+
+
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ earliestInstantToRetain = instantTime;
+ List<HoodieRecord> updatedRecords = dataGen.generateUpdates(instantTime,
records);
+ JavaRDD<HoodieRecord> updatedRecordsRDD =
jsc.parallelize(updatedRecords, 1);
+ SparkRDDReadClient readClient = new SparkRDDReadClient(context,
writeConfig);
+ JavaRDD<HoodieRecord> updatedTaggedRecordsRDD =
readClient.tagLocation(updatedRecordsRDD);
+ client.startCommitWithTime(instantTime);
+ client.upsertPreppedRecords(updatedTaggedRecordsRDD,
instantTime).collect();
+
+ table.getMetaClient().reloadActiveTimeline();
+ // pending compaction
+ String compactionInstantTime =
client.scheduleCompaction(Option.empty()).get().toString();
+
+ for (int idx = 0; idx < 3; ++idx) {
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ records = dataGen.generateInsertsForPartition(instantTime, 1,
partition2);
+ client.startCommitWithTime(instantTime);
+ client.insert(jsc.parallelize(records, 1), instantTime).collect();
+ }
+
+ // earliest commit to retain should be earlier than first pending
compaction in incremental cleaning scenarios.
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty());
+
assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
+ }
+ }
+
/**
* Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple
config is set to false.
*/
@@ -777,16 +869,17 @@ public class TestCleaner extends HoodieCleanerTestBase {
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build())
.build();
+
// Deletions:
- // . FileId Base Logs Total Retained Commits
- // FileId7 5 10 15 009, 011
- // FileId6 5 10 15 009
- // FileId5 3 6 9 005
- // FileId4 2 4 6 003
- // FileId3 1 2 3 001
- // FileId2 0 0 0 000
- // FileId1 0 0 0 000
- testPendingCompactions(config, 48, 18, false);
+ // . FileId Base Logs Total Retained_Commits Under_Compaction
+ // FileId7 1 2 3 001,003 false
+ // FileId6 1 2 3 001,003 false
+ // FileId5 1 2 3 001,003 true
+ // FileId4 1 2 3 001,003 true
+ // FileId3 1 2 3 001 true
+ // FileId2 0 0 0 000 true
+ // FileId1 0 0 0 000 false
+ testPendingCompactions(config, 15, 9, false);
}
/**
@@ -801,15 +894,16 @@ public class TestCleaner extends HoodieCleanerTestBase {
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
.build();
+
// Deletions:
- // . FileId Base Logs Total Retained Commits
- // FileId7 5 10 15 009, 011
- // FileId6 4 8 12 007, 009
- // FileId5 2 4 6 003 005
- // FileId4 1 2 3 001, 003
- // FileId3 0 0 0 000, 001
- // FileId2 0 0 0 000
- // FileId1 0 0 0 000
+ // . FileId Base Logs Total Retained_Commits Under_Compaction
+ // FileId7 5 10 15 009,013 false
+ // FileId6 4 8 12 007,009 false
+ // FileId5 2 4 6 003,005 true
+ // FileId4 1 2 3 001,003 true
+ // FileId3 0 0 0 000,001 true
+ // FileId2 0 0 0 000 true
+ // FileId1 0 0 0 000 false
testPendingCompactions(config, 36, 9, retryFailure);
}
@@ -1005,23 +1099,24 @@ public class TestCleaner extends HoodieCleanerTestBase {
HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
final String partition = "2016/03/15";
+ String timePrefix = "00000000000";
Map<String, String> expFileIdToPendingCompaction = new HashMap<String,
String>() {
{
- put("fileId2", "004");
- put("fileId3", "006");
- put("fileId4", "008");
- put("fileId5", "010");
+ put("fileId2", timePrefix + "004");
+ put("fileId3", timePrefix + "006");
+ put("fileId4", timePrefix + "008");
+ put("fileId5", timePrefix + "010");
}
};
Map<String, String> fileIdToLatestInstantBeforeCompaction = new
HashMap<String, String>() {
{
- put("fileId1", "000");
- put("fileId2", "000");
- put("fileId3", "001");
- put("fileId4", "003");
- put("fileId5", "005");
- put("fileId6", "009");
- put("fileId7", "011");
+ put("fileId1", timePrefix + "000");
+ put("fileId2", timePrefix + "000");
+ put("fileId3", timePrefix + "001");
+ put("fileId4", timePrefix + "003");
+ put("fileId5", timePrefix + "005");
+ put("fileId6", timePrefix + "009");
+ put("fileId7", timePrefix + "013");
}
};
@@ -1047,60 +1142,60 @@ public class TestCleaner extends HoodieCleanerTestBase {
Map<String, List<String>> part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1,
file4P1, file5P1, file6P1, file7P1));
// all 7 fileIds
- commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, true);
+ commitWithMdt(timePrefix + "000", part1ToFileId, testTable,
metadataWriter, true, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1,
file6P1, file7P1));
// fileIds 3 to 7
- commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, true);
+ commitWithMdt(timePrefix + "001", part1ToFileId, testTable,
metadataWriter, true, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1,
file7P1));
// fileIds 4 to 7
- commitWithMdt("003", part1ToFileId, testTable, metadataWriter, true, true);
+ commitWithMdt(timePrefix + "003", part1ToFileId, testTable,
metadataWriter, true, true);
// add compaction
- testTable.addRequestedCompaction("004", new FileSlice(partition, "000",
file2P1));
+ testTable.addRequestedCompaction(timePrefix + "004", new
FileSlice(partition, timePrefix + "000", file2P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file2P1));
- commitWithMdt("005", part1ToFileId, testTable, metadataWriter, false,
true);
+ commitWithMdt(timePrefix + "005", part1ToFileId, testTable,
metadataWriter, false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1));
- commitWithMdt("0055", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(timePrefix + "0055", part1ToFileId, testTable,
metadataWriter, true, true);
- testTable.addRequestedCompaction("006", new FileSlice(partition, "001",
file3P1));
+ testTable.addRequestedCompaction(timePrefix + "006", new
FileSlice(partition, timePrefix + "001", file3P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file3P1));
- commitWithMdt("007", part1ToFileId, testTable, metadataWriter, false,
true);
+ commitWithMdt(timePrefix + "007", part1ToFileId, testTable,
metadataWriter, false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
- commitWithMdt("0075", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(timePrefix + "0075", part1ToFileId, testTable,
metadataWriter, true, true);
- testTable.addRequestedCompaction("008", new FileSlice(partition, "003",
file4P1));
+ testTable.addRequestedCompaction(timePrefix + "008", new
FileSlice(partition, timePrefix + "003", file4P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file4P1));
- commitWithMdt("009", part1ToFileId, testTable, metadataWriter, false,
true);
+ commitWithMdt(timePrefix + "009", part1ToFileId, testTable,
metadataWriter, false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
- commitWithMdt("0095", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(timePrefix + "0095", part1ToFileId, testTable,
metadataWriter, true, true);
- testTable.addRequestedCompaction("010", new FileSlice(partition, "005",
file5P1));
+ testTable.addRequestedCompaction(timePrefix + "010", new
FileSlice(partition, timePrefix + "005", file5P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file5P1));
- commitWithMdt("011", part1ToFileId, testTable, metadataWriter, false,
true);
+ commitWithMdt(timePrefix + "011", part1ToFileId, testTable,
metadataWriter, false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file7P1));
- commitWithMdt("013", part1ToFileId, testTable, metadataWriter, true, true);
+ commitWithMdt(timePrefix + "013", part1ToFileId, testTable,
metadataWriter, true, true);
// Clean now
metaClient = HoodieTableMetaClient.reload(metaClient);
- List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, retryFailure);
+ List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, 14, true);
// Test for safety
final HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.reload(metaClient);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 8c4a5cb377e..6182bc4d4eb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -267,6 +267,13 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
}
+ /**
+ * Get all instants (commits, delta commits, replace, compaction) that
produce new data or merge file, in the active timeline.
+ */
+ public HoodieTimeline getCommitsAndCompactionTimeline() {
+ return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, COMPACTION_ACTION));
+ }
+
/**
* Get all instants (commits, delta commits, compaction, clean, savepoint,
rollback, replace commits, index) that result in actions,
* in the active timeline.