This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f745e645735 [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568) f745e645735 is described below commit f745e6457353804359b20575c597b38507237aba Author: Nicholas Jiang <programg...@163.com> AuthorDate: Fri Jan 6 20:29:29 2023 +0800 [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568) --- .../apache/hudi/client/HoodieTimelineArchiver.java | 16 ++++- .../hudi/table/action/clean/CleanPlanner.java | 19 +++++- .../apache/hudi/common/util/ClusteringUtils.java | 37 ++++++++++++ .../hudi/common/util/TestClusteringUtils.java | 69 ++++++++++++++++++++++ 4 files changed, 137 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 6d632417a42..f4937de943e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FileIOUtils; @@ -399,7 +400,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { }).flatMap(Collection::stream); } - private Stream<HoodieInstant> getCommitInstantsToArchive() { + private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException { // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concat HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); @@ -432,6 +433,11 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) : Option.empty(); + // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned, + // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers. + Option<HoodieInstant> oldestInstantToRetainForClustering = + ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + // Actually do the commits Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstantsAsStream() .filter(s -> { @@ -444,7 +450,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); } }).filter(s -> { - // Ensure commits >= oldest pending compaction commit is retained + // Ensure commits >= the oldest pending compaction commit is retained return oldestPendingCompactionAndReplaceInstant .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); @@ -461,6 +467,10 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { oldestInstantToRetainForCompaction.map(instantToRetain -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) .orElse(true) + ).filter(s -> + oldestInstantToRetainForClustering.map(instantToRetain -> + HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) + .orElse(true) ); return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { @@ -468,7 +478,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { } } - private Stream<HoodieInstant> getInstantsToArchive() { + private Stream<HoodieInstant> getInstantsToArchive() throws IOException { Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); if (config.isMetastoreEnabled()) { return Stream.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 5de92af3258..982800cc246 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -487,7 +487,24 @@ public class CleanPlanner<T, I, K, O> implements Serializable { int hoursRetained = config.getCleanerHoursRetained(); if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS && commitTimeline.countInstants() > commitsRetained) { - earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list + Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient() + .getActiveTimeline() + .getCommitsTimeline() + .filter(s -> !s.isCompleted()).firstInstant(); + if (earliestPendingCommits.isPresent()) { + // Earliest commit to retain must not be later than the earliest pending commit + earliestCommitToRetain = + commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> { + if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) { + return Option.of(nthInstant); + } else { + return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant(); + } + }).orElse(Option.empty()); + } else { + earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() + - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list + } } else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { Instant instant = Instant.now(); ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 3b7d43cb6db..c669de76b21 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -224,4 +225,40 @@ public class ClusteringUtils { public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { return getClusteringPlan(metaClient, instant).isPresent(); } + + /** + * Checks whether the latest clustering instant has a subsequent cleaning action. Returns + * the clustering instant if there is such cleaning action or empty. + * + * @param activeTimeline The active timeline + * @param metaClient The meta client + * @return the oldest instant to retain for clustering + */ + public static Option<HoodieInstant> getOldestInstantToRetainForClustering( + HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException { + HoodieTimeline replaceTimeline = activeTimeline.getCompletedReplaceTimeline(); + if (!replaceTimeline.empty()) { + Option<HoodieInstant> cleanInstantOpt = + activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant(); + if (cleanInstantOpt.isPresent()) { + // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of + // the clean metadata. + HoodieInstant cleanInstant = cleanInstantOpt.get(); + String earliestCommitToRetain = + CleanerUtils.getCleanerPlan(metaClient, + cleanInstant.isRequested() + ? cleanInstant + : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())) + .getEarliestInstantToRetain().getTimestamp(); + return StringUtils.isNullOrEmpty(earliestCommitToRetain) + ? Option.empty() + : replaceTimeline.filter(instant -> + HoodieTimeline.compareTimestamps(instant.getTimestamp(), + HoodieTimeline.GREATER_THAN_OR_EQUALS, + earliestCommitToRetain)) + .firstInstant(); + } + } + return Option.empty(); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java index 0948cb615be..5235183d10f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java @@ -18,17 +18,21 @@ package org.apache.hudi.common.util; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.junit.jupiter.api.BeforeEach; @@ -44,6 +48,7 @@ import java.util.UUID; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; /** * Tests for {@link ClusteringUtils}. @@ -115,6 +120,70 @@ public class TestClusteringUtils extends HoodieCommonTestHarness { assertEquals(requestedClusteringPlan, inflightClusteringPlan); } + @Test + public void testGetOldestInstantToRetainForClustering() throws IOException { + String partitionPath1 = "partition1"; + List<String> fileIds1 = new ArrayList<>(); + fileIds1.add(UUID.randomUUID().toString()); + String clusterTime1 = "1"; + HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1); + HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty()); + HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty()); + List<String> fileIds2 = new ArrayList<>(); + fileIds2.add(UUID.randomUUID().toString()); + fileIds2.add(UUID.randomUUID().toString()); + String clusterTime2 = "2"; + HoodieInstant requestedInstant2 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2); + HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2, Option.empty()); + metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2, Option.empty()); + List<String> fileIds3 = new ArrayList<>(); + fileIds3.add(UUID.randomUUID().toString()); + fileIds3.add(UUID.randomUUID().toString()); + fileIds3.add(UUID.randomUUID().toString()); + String clusterTime3 = "3"; + HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3); + HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty()); + HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty()); + metaClient.reloadActiveTimeline(); + Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + assertFalse(actual.isPresent()); + // test first uncompleted clean instant is requested. + String cleanTime1 = "4"; + HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1); + HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder() + .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder() + .setAction(completedInstant1.getAction()) + .setTimestamp(completedInstant1.getTimestamp()) + .setState(completedInstant1.getState().name())) + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .setFilesToBeDeletedPerPartition(new HashMap<>()) + .setVersion(CleanPlanV2MigrationHandler.VERSION) + .build(); + metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1)); + metaClient.reloadActiveTimeline(); + actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + assertEquals(clusterTime1, actual.get().getTimestamp()); + HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty()); + metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty()); + // test first uncompleted clean instant is inflight. + String cleanTime2 = "5"; + HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2); + HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder() + .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder() + .setAction(completedInstant3.getAction()) + .setTimestamp(completedInstant3.getTimestamp()) + .setState(completedInstant3.getState().name())) + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .setFilesToBeDeletedPerPartition(new HashMap<>()) + .setVersion(CleanPlanV2MigrationHandler.VERSION) + .build(); + metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2)); + metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty()); + metaClient.reloadActiveTimeline(); + actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + assertEquals(clusterTime3, actual.get().getTimestamp()); + } + private void validateClusteringInstant(List<String> fileIds, String partitionPath, String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) { for (String fileId : fileIds) {