This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ea48ca9b62a2f216460a0795ef9d519481510cd7 Author: Balaji Varadarajan <[email protected]> AuthorDate: Fri May 1 21:37:21 2020 -0700 [HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576) --- .../java/org/apache/hudi/table/CleanHelper.java | 85 ++++++++++++++++------ .../java/org/apache/hudi/table/TestCleaner.java | 42 +++++------ 2 files changed, 85 insertions(+), 42 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java index 3c73c7e..38eddd7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java @@ -38,13 +38,13 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,34 +90,77 @@ public class CleanHelper<T extends HoodieRecordPayload<T>> implements Serializab * @throws IOException when underlying file-system throws this exception */ public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException { - if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() - && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { - Option<HoodieInstant> lastClean = - hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); + switch (config.getCleanerPolicy()) { + case KEEP_LATEST_COMMITS: + return getPartitionPathsForCleanByCommits(newInstantToRetain); + case KEEP_LATEST_FILE_VERSIONS: + return getPartitionPathsForFullCleaning(); + default: + throw new IllegalStateException("Unknown Cleaner Policy"); + } + } + + /** + * Return partition paths for cleaning by commits mode. + * @param instantToRetain Earliest Instant to retain + * @return list of partitions + * @throws IOException + */ + private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException { + if (!instantToRetain.isPresent()) { + LOG.info("No earliest commit to retain. No need to scan partitions !!"); + return Collections.emptyList(); + } + + if (config.incrementalCleanerModeEnabled()) { + Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); if (lastClean.isPresent()) { HoodieCleanMetadata cleanMetadata = AvroUtils .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get()); if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { - LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " - + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() - + ". New Instant to retain : " + newInstantToRetain); - return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), - HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), - newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); - return commitMetadata.getPartitionToWriteStats().keySet().stream(); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - }).distinct().collect(Collectors.toList()); + return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain); } } } - // Otherwise go to brute force mode of scanning all partitions - return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), - hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); + return getPartitionPathsForFullCleaning(); + } + + /** + * Use Incremental Mode for finding partition paths. + * @param cleanMetadata + * @param newInstantToRetain + * @return + */ + private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, + Option<HoodieInstant> newInstantToRetain) { + LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " + + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + + ". New Instant to retain : " + newInstantToRetain); + return hoodieTable.getCompletedCommitsTimeline().getInstants().filter( + instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), + HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), + newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), + HoodieCommitMetadata.class); + return commitMetadata.getPartitionToWriteStats().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }).distinct().collect(Collectors.toList()); + } + + /** + * Scan and list all paritions for cleaning. + * @return all partitions paths for the dataset. + * @throws IOException + */ + private List<String> getPartitionPathsForFullCleaning() throws IOException { + // Go to brute force mode of scanning all partitions + return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index f6ad230..dee545a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -108,7 +108,8 @@ public class TestCleaner extends TestHoodieClientBase { */ private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client, Function2<List<HoodieRecord>, String, Integer> recordGenFunction, - Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception { + Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn, + HoodieCleaningPolicy cleaningPolicy) throws Exception { /* * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages @@ -131,10 +132,15 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTable table = HoodieTable.getHoodieTable(metaClient, client.getConfig(), jsc); assertFalse(table.getCompletedCommitsTimeline().empty()); - String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); - assertFalse(table.getCompletedCleanTimeline().empty()); - assertEquals("The clean instant should be the same as the commit instant", commitTime, - table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp()); + if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) { + // We no longer write empty cleaner plans when there are not enough commits present + assertTrue(table.getCompletedCleanTimeline().empty()); + } else { + String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); + assertFalse(table.getCompletedCleanTimeline().empty()); + assertEquals("The clean instant should be the same as the commit instant", instantTime, + table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp()); + } HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect(); @@ -205,7 +211,8 @@ public class TestCleaner extends TestHoodieClientBase { final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); + insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -353,7 +360,7 @@ public class TestCleaner extends TestHoodieClientBase { int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build()) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); @@ -365,7 +372,8 @@ public class TestCleaner extends TestHoodieClientBase { final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); + insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, + HoodieCleaningPolicy.KEEP_LATEST_COMMITS); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> { @@ -380,7 +388,9 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); - Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); + // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest + // commit + Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); if (earliestRetainedCommit.isPresent()) { acceptableCommits @@ -753,12 +763,7 @@ public class TestCleaner extends TestHoodieClientBase { metaClient = HoodieTableMetaClient.reload(metaClient); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); - assertEquals("Must not clean any files", 0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size()); - assertEquals("Must not clean any files", 0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size()); + assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsOne.size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", @@ -788,12 +793,7 @@ public class TestCleaner extends TestHoodieClientBase { new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); - assertEquals("Must not clean any files", 0, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size()); - assertEquals("Must not clean any files", 0, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size()); + assertEquals("Must not scan any partitions and clean any files", 0, hoodieCleanStatsTwo.size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
