This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f745e645735 [HUDI-5341] CleanPlanner retains earliest commits must not
be later than earliest pending commit (#7568)
f745e645735 is described below
commit f745e6457353804359b20575c597b38507237aba
Author: Nicholas Jiang <[email protected]>
AuthorDate: Fri Jan 6 20:29:29 2023 +0800
[HUDI-5341] CleanPlanner retains earliest commits must not be later than
earliest pending commit (#7568)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 16 ++++-
.../hudi/table/action/clean/CleanPlanner.java | 19 +++++-
.../apache/hudi/common/util/ClusteringUtils.java | 37 ++++++++++++
.../hudi/common/util/TestClusteringUtils.java | 69 ++++++++++++++++++++++
4 files changed, 137 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 6d632417a42..f4937de943e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -399,7 +400,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}).flatMap(Collection::stream);
}
- private Stream<HoodieInstant> getCommitInstantsToArchive() {
+ private Stream<HoodieInstant> getCommitInstantsToArchive() throws
IOException {
// TODO (na) : Add a way to return actions associated with a timeline and
then merge/unify
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -432,6 +433,11 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
table.getActiveTimeline(),
config.getInlineCompactDeltaCommitMax())
: Option.empty();
+ // The clustering commit instant can not be archived unless we ensure
that the replaced files have been cleaned,
+ // without the replaced files metadata on the timeline, the fs view
would expose duplicates for readers.
+ Option<HoodieInstant> oldestInstantToRetainForClustering =
+
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient());
+
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream =
commitTimeline.getInstantsAsStream()
.filter(s -> {
@@ -444,7 +450,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
return !(firstSavepoint.isPresent() &&
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS,
s.getTimestamp()));
}
}).filter(s -> {
- // Ensure commits >= oldest pending compaction commit is retained
+ // Ensure commits >= the oldest pending compaction commit is
retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> compareTimestamps(instant.getTimestamp(),
GREATER_THAN, s.getTimestamp()))
.orElse(true);
@@ -461,6 +467,10 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN,
instantToRetain.getTimestamp()))
.orElse(true)
+ ).filter(s ->
+ oldestInstantToRetainForClustering.map(instantToRetain ->
+ HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, instantToRetain.getTimestamp()))
+ .orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() -
minInstantsToKeep);
} else {
@@ -468,7 +478,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
}
}
- private Stream<HoodieInstant> getInstantsToArchive() {
+ private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
Stream<HoodieInstant> instants =
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 5de92af3258..982800cc246 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -487,7 +487,24 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
- earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
//15 instants total, 10 commits to retain, this gives 6th instant in the list
+ Option<HoodieInstant> earliestPendingCommits =
hoodieTable.getMetaClient()
+ .getActiveTimeline()
+ .getCommitsTimeline()
+ .filter(s -> !s.isCompleted()).firstInstant();
+ if (earliestPendingCommits.isPresent()) {
+ // Earliest commit to retain must not be later than the earliest
pending commit
+ earliestCommitToRetain =
+ commitTimeline.nthInstant(commitTimeline.countInstants() -
commitsRetained).map(nthInstant -> {
+ if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+ return Option.of(nthInstant);
+ } else {
+ return
commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+ }
+ }).orElse(Option.empty());
+ } else {
+ earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants()
+ - commitsRetained); //15 instants total, 10 commits to retain,
this gives 6th instant in the list
+ }
} else if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
Instant instant = Instant.now();
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 3b7d43cb6db..c669de76b21 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -224,4 +225,40 @@ public class ClusteringUtils {
public static boolean isPendingClusteringInstant(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
return getClusteringPlan(metaClient, instant).isPresent();
}
+
+ /**
+ * Checks whether the latest clustering instant has a subsequent cleaning
action. Returns
+ * the clustering instant if there is such cleaning action or empty.
+ *
+ * @param activeTimeline The active timeline
+ * @param metaClient The meta client
+ * @return the oldest instant to retain for clustering
+ */
+ public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+ HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient)
throws IOException {
+ HoodieTimeline replaceTimeline =
activeTimeline.getCompletedReplaceTimeline();
+ if (!replaceTimeline.empty()) {
+ Option<HoodieInstant> cleanInstantOpt =
+ activeTimeline.getCleanerTimeline().filter(instant ->
!instant.isCompleted()).firstInstant();
+ if (cleanInstantOpt.isPresent()) {
+ // The first clustering instant of which timestamp is greater than or
equal to the earliest commit to retain of
+ // the clean metadata.
+ HoodieInstant cleanInstant = cleanInstantOpt.get();
+ String earliestCommitToRetain =
+ CleanerUtils.getCleanerPlan(metaClient,
+ cleanInstant.isRequested()
+ ? cleanInstant
+ :
HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
+ .getEarliestInstantToRetain().getTimestamp();
+ return StringUtils.isNullOrEmpty(earliestCommitToRetain)
+ ? Option.empty()
+ : replaceTimeline.filter(instant ->
+ HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ HoodieTimeline.GREATER_THAN_OR_EQUALS,
+ earliestCommitToRetain))
+ .firstInstant();
+ }
+ }
+ return Option.empty();
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 0948cb615be..5235183d10f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -18,17 +18,21 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.junit.jupiter.api.BeforeEach;
@@ -44,6 +48,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* Tests for {@link ClusteringUtils}.
@@ -115,6 +120,70 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
assertEquals(requestedClusteringPlan, inflightClusteringPlan);
}
+ @Test
+ public void testGetOldestInstantToRetainForClustering() throws IOException {
+ String partitionPath1 = "partition1";
+ List<String> fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ String clusterTime1 = "1";
+ HoodieInstant requestedInstant1 =
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+ HoodieInstant inflightInstant1 =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1,
Option.empty());
+ HoodieInstant completedInstant1 =
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1,
Option.empty());
+ List<String> fileIds2 = new ArrayList<>();
+ fileIds2.add(UUID.randomUUID().toString());
+ fileIds2.add(UUID.randomUUID().toString());
+ String clusterTime2 = "2";
+ HoodieInstant requestedInstant2 =
createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
+ HoodieInstant inflightInstant2 =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2,
Option.empty());
+
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2,
Option.empty());
+ List<String> fileIds3 = new ArrayList<>();
+ fileIds3.add(UUID.randomUUID().toString());
+ fileIds3.add(UUID.randomUUID().toString());
+ fileIds3.add(UUID.randomUUID().toString());
+ String clusterTime3 = "3";
+ HoodieInstant requestedInstant3 =
createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3);
+ HoodieInstant inflightInstant3 =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3,
Option.empty());
+ HoodieInstant completedInstant3 =
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3,
Option.empty());
+ metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ assertFalse(actual.isPresent());
+ // test first uncompleted clean instant is requested.
+ String cleanTime1 = "4";
+ HoodieInstant requestedInstant4 = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
cleanTime1);
+ HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
+ .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+ .setAction(completedInstant1.getAction())
+ .setTimestamp(completedInstant1.getTimestamp())
+ .setState(completedInstant1.getState().name()))
+ .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .setFilesToBeDeletedPerPartition(new HashMap<>())
+ .setVersion(CleanPlanV2MigrationHandler.VERSION)
+ .build();
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+ metaClient.reloadActiveTimeline();
+ actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ assertEquals(clusterTime1, actual.get().getTimestamp());
+ HoodieInstant inflightInstant4 =
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4,
Option.empty());
+
metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4,
Option.empty());
+ // test first uncompleted clean instant is inflight.
+ String cleanTime2 = "5";
+ HoodieInstant requestedInstant5 = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
cleanTime2);
+ HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
+ .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+ .setAction(completedInstant3.getAction())
+ .setTimestamp(completedInstant3.getTimestamp())
+ .setState(completedInstant3.getState().name()))
+ .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .setFilesToBeDeletedPerPartition(new HashMap<>())
+ .setVersion(CleanPlanV2MigrationHandler.VERSION)
+ .build();
+ metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
+
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5,
Option.empty());
+ metaClient.reloadActiveTimeline();
+ actual =
ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ assertEquals(clusterTime3, actual.get().getTimestamp());
+ }
+
private void validateClusteringInstant(List<String> fileIds, String
partitionPath,
String expectedInstantTime,
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {