This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ee5bb7c2e36 [HUDI-6370] Automatically decide archival boundary based
on cleaning configs (#8972)
ee5bb7c2e36 is described below
commit ee5bb7c2e36960cb4a0889b7363cbcb164b4a941
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jun 14 19:51:48 2023 -0700
[HUDI-6370] Automatically decide archival boundary based on cleaning
configs (#8972)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 93 +++++++++++++-----
.../org/apache/hudi/config/HoodieWriteConfig.java | 13 +--
.../hudi/table/action/clean/CleanPlanner.java | 42 ++------
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 107 ++++++++++++++-------
.../org/apache/hudi/common/util/CleanerUtils.java | 42 +++++++-
5 files changed, 199 insertions(+), 98 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 0a6659cb1e7..74eeeaf22f4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
@@ -75,11 +76,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.time.Instant;
-import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -88,10 +87,15 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
+import static org.apache.hudi.config.HoodieArchivalConfig.MAX_COMMITS_TO_KEEP;
+import static org.apache.hudi.config.HoodieArchivalConfig.MIN_COMMITS_TO_KEEP;
+import static
org.apache.hudi.config.HoodieCleanConfig.CLEANER_COMMITS_RETAINED;
+import static
org.apache.hudi.config.HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED;
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_HOURS_RETAINED;
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
/**
* Archiver to bound the growth of files under .hoodie meta path.
@@ -114,9 +118,69 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
this.table = table;
this.metaClient = table.getMetaClient();
this.archiveFilePath =
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
- this.maxInstantsToKeep = config.getMaxCommitsToKeep();
- this.minInstantsToKeep = config.getMinCommitsToKeep();
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+
+ Option<HoodieInstant> earliestCommitToRetain = Option.empty();
+ HoodieTimeline completedCommitsTimeline =
table.getCompletedCommitsTimeline();
+ Option<HoodieInstant> latestCommit =
completedCommitsTimeline.lastInstant();
+ HoodieCleaningPolicy cleanerPolicy = config.getCleanerPolicy();
+ int cleanerCommitsRetained = config.getCleanerCommitsRetained();
+ int cleanerHoursRetained = config.getCleanerHoursRetained();
+
+ try {
+ earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
+ metaClient.getActiveTimeline().getCommitsTimeline(),
+ cleanerPolicy,
+ cleanerCommitsRetained,
+ latestCommit.isPresent()
+ ?
HoodieActiveTimeline.parseDateFromInstantTime(latestCommit.get().getTimestamp()).toInstant()
+ : Instant.now(),
+ cleanerHoursRetained,
+ metaClient.getTableConfig().getTimelineTimezone());
+ } catch (ParseException e) {
+ LOG.warn("Error parsing instant time: " +
latestCommit.get().getTimestamp());
+ }
+
+ int configuredMinInstantsToKeep = config.getMinCommitsToKeep();
+ int configuredMaxInstantsToKeep = config.getMaxCommitsToKeep();
+ if (earliestCommitToRetain.isPresent()) {
+ int minInstantsToKeepBasedOnCleaning =
+
completedCommitsTimeline.findInstantsAfter(earliestCommitToRetain.get().getTimestamp())
+ .countInstants() + 2;
+ if (configuredMinInstantsToKeep < minInstantsToKeepBasedOnCleaning) {
+ this.maxInstantsToKeep = minInstantsToKeepBasedOnCleaning
+ + configuredMaxInstantsToKeep - configuredMinInstantsToKeep;
+ this.minInstantsToKeep = minInstantsToKeepBasedOnCleaning;
+ LOG.warn("The configured archival configs {}={} is more aggressive
than the cleaning "
+ + "configs as the earliest commit to retain is {}. Adjusted
the archival configs "
+ + "to be {}={} and {}={}",
+ MIN_COMMITS_TO_KEEP.key(), configuredMinInstantsToKeep,
earliestCommitToRetain.get(),
+ MIN_COMMITS_TO_KEEP.key(), this.minInstantsToKeep,
+ MAX_COMMITS_TO_KEEP.key(), this.maxInstantsToKeep);
+ switch (cleanerPolicy) {
+ case KEEP_LATEST_COMMITS:
+ LOG.warn("Cleaning configs: {}=KEEP_LATEST_COMMITS {}={}",
CLEANER_POLICY.key(),
+ CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained);
+ break;
+ case KEEP_LATEST_BY_HOURS:
+ LOG.warn("Cleaning configs: {}=KEEP_LATEST_BY_HOURS {}={}",
CLEANER_POLICY.key(),
+ CLEANER_HOURS_RETAINED.key(), cleanerHoursRetained);
+ break;
+ case KEEP_LATEST_FILE_VERSIONS:
+ LOG.warn("Cleaning configs: {}=CLEANER_FILE_VERSIONS_RETAINED
{}={}", CLEANER_POLICY.key(),
+ CLEANER_FILE_VERSIONS_RETAINED.key(),
config.getCleanerFileVersionsRetained());
+ break;
+ default:
+ break;
+ }
+ } else {
+ this.maxInstantsToKeep = configuredMaxInstantsToKeep;
+ this.minInstantsToKeep = configuredMinInstantsToKeep;
+ }
+ } else {
+ this.maxInstantsToKeep = configuredMaxInstantsToKeep;
+ this.minInstantsToKeep = configuredMinInstantsToKeep;
+ }
}
private Writer openWriter() {
@@ -492,24 +556,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);
- List<HoodieInstant> instantsToArchive =
instantToArchiveStream.limit(commitTimeline.countInstants() -
minInstantsToKeep).collect(Collectors.toList());
- // If cleaner is based on hours, lets ensure hudi does not archive
commits yet to cleaned by the cleaner.
- if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS && !instantsToArchive.isEmpty()) {
- String latestCommitToArchive =
instantsToArchive.get(instantsToArchive.size() - 1).getTimestamp();
- try {
- Instant latestCommitInstant =
HoodieActiveTimeline.parseDateFromInstantTime(commitTimeline.lastInstant().get().getTimestamp()).toInstant();
- ZonedDateTime currentDateTime =
ZonedDateTime.ofInstant(latestCommitInstant,
metaClient.getTableConfig().getTimelineTimezone().getZoneId());
- String earliestTimeToRetain =
HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(config.getCleanerHoursRetained()).toInstant()));
- if (HoodieTimeline.compareTimestamps(latestCommitToArchive,
GREATER_THAN_OR_EQUALS, earliestTimeToRetain)) {
- throw new HoodieIOException("Please align your archival configs
based on cleaner configs. 'hoodie.keep.min.commits' : "
- + config.getMinCommitsToKeep() + " + should be greater than "
- + " 'hoodie.cleaner.hours.retained' : " +
config.getCleanerHoursRetained());
- }
- } catch (ParseException e) {
- throw new HoodieIOException("Failed to parse latest commit instant
time " + commitTimeline.lastInstant().get().getTimestamp() + e.getMessage());
- }
- }
- return instantsToArchive.stream();
+ return instantToArchiveStream.limit(commitTimeline.countInstants() -
minInstantsToKeep);
} else {
return Stream.empty();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 4307015aed6..cd81c4fda07 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3103,12 +3103,13 @@ public class HoodieWriteConfig extends HoodieConfig {
"Increase %s=%d to be greater than %s=%d.",
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
maxInstantsToKeep,
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
minInstantsToKeep));
- checkArgument(minInstantsToKeep > cleanerCommitsRetained,
- String.format(
- "Increase %s=%d to be greater than %s=%d. Otherwise, there is
risk of incremental pull "
- + "missing data from few instants.",
- HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
minInstantsToKeep,
- HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
cleanerCommitsRetained));
+ if (minInstantsToKeep <= cleanerCommitsRetained) {
+ LOG.warn("Increase {}={} to be greater than {}={} (there is risk of
incremental pull "
+ + "missing data from few instants based on the current
configuration). "
+ + "The Hudi archiver will automatically adjust the
configuration regardless.",
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
minInstantsToKeep,
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
cleanerCommitsRetained);
+ }
}
boolean inlineCompact =
writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 751edfded59..d855edc3569 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -40,6 +39,7 @@ import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1Migrati
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -54,10 +54,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
-import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -510,37 +508,13 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
* Returns earliest commit to retain based on cleaning policy.
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
- Option<HoodieInstant> earliestCommitToRetain = Option.empty();
- int commitsRetained = config.getCleanerCommitsRetained();
- int hoursRetained = config.getCleanerHoursRetained();
- if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
- && commitTimeline.countInstants() > commitsRetained) {
- Option<HoodieInstant> earliestPendingCommits =
hoodieTable.getMetaClient()
- .getActiveTimeline()
- .getCommitsTimeline()
- .filter(s -> !s.isCompleted()).firstInstant();
- if (earliestPendingCommits.isPresent()) {
- // Earliest commit to retain must not be later than the earliest
pending commit
- earliestCommitToRetain =
- commitTimeline.nthInstant(commitTimeline.countInstants() -
commitsRetained).map(nthInstant -> {
- if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
- return Option.of(nthInstant);
- } else {
- return
commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
- }
- }).orElse(Option.empty());
- } else {
- earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants()
- - commitsRetained); //15 instants total, 10 commits to retain,
this gives 6th instant in the list
- }
- } else if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
- Instant instant = Instant.now();
- ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant,
hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
- String earliestTimeToRetain =
HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
- earliestCommitToRetain =
Option.fromJavaOptional(commitTimeline.getInstantsAsStream().filter(i ->
HoodieTimeline.compareTimestamps(i.getTimestamp(),
- HoodieTimeline.GREATER_THAN_OR_EQUALS,
earliestTimeToRetain)).findFirst());
- }
- return earliestCommitToRetain;
+ return CleanerUtils.getEarliestCommitToRetain(
+ hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(),
+ config.getCleanerPolicy(),
+ config.getCleanerCommitsRetained(),
+ Instant.now(),
+ config.getCleanerHoursRetained(),
+ hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index b049c60b9e7..0399bf14242 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -54,7 +54,6 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -66,7 +65,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -324,17 +322,24 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
}
}
- @Test
- public void testArchivalWithCleanBasedOnHours() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"KEEP_LATEST_BY_HOURS", "KEEP_LATEST_COMMITS"})
+ public void testArchivalWithAutoAdjustmentBasedOnCleanConfigs(String
cleaningPolicy) throws Exception {
+ // This test verifies that when the archival configs are more aggressive
than the cleaning
+ // configs, the archiver adjust the min and max commits to keep
automatically based on the
+ // cleaning configs.
init();
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(1)
- .retainCommits(2)
+ .withCleanerPolicy(HoodieCleaningPolicy.valueOf(cleaningPolicy))
+ .cleanerNumHoursRetained(1)
+ .retainCommits(5)
.build())
-
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3,4).build())
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3,
4).build())
.build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -342,42 +347,56 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
String p1 = "2020/01/02";
Instant instant = Instant.now();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
-
- triggerCommit(p0, p1, commitDateTime, 5, testTable);
- triggerCommit(p0, p1, commitDateTime, 10, testTable);
- triggerCommit(p0, p1, commitDateTime, 20, testTable);
- triggerCommit(p0, p1, commitDateTime, 30, testTable);
- triggerCommit(p0, p1, commitDateTime, 40, testTable);
+ List<HoodieInstant> expectedAllCommits = new ArrayList<>();
+
+ // The following commits should be archived
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 90, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 80, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 70, true,
testTable));
+ // The following commits should not be archived
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 50, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 45, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 40, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 30, false,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 20, false,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 10, true,
testTable));
+ expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 5, true,
testTable));
metaClient = HoodieTableMetaClient.reload(metaClient);
- // lets trigger archival. should fail since archival configs does not
align w/ cleaner configs.
- try {
- archiveAndGetCommitsList(config);
- Assertions.fail("Should have failed archival since archival configs are
not aligned with cleaner configs. ");
- } catch (HoodieIOException e) {
- assertTrue(e.getMessage().contains("Please align your archival configs
based on cleaner configs"));
- }
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
+ archiveAndGetCommitsList(config, true);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+ assertInstantListEquals(expectedAllCommits, originalCommits);
+ assertInstantListEquals(
+ expectedAllCommits.subList(2, expectedAllCommits.size()),
commitsAfterArchival);
}
- private void triggerCommit(String p0, String p1, ZonedDateTime curDateTime,
int minutesForCommit, HoodieTestTable testTable) throws Exception {
+ private HoodieInstant triggerCommit(
+ String p0, String p1, ZonedDateTime curDateTime, int minutesForCommit,
+ boolean isComplete, HoodieTestTable testTable) throws Exception {
String file1P0C0 = UUID.randomUUID().toString();
String file1P1C0 = UUID.randomUUID().toString();
String commitTs =
HoodieActiveTimeline.formatDate(Date.from(curDateTime.minusMinutes(minutesForCommit).toInstant()));
testTable.addInflightCommit(commitTs).withBaseFilesInPartition(p0,
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
- HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
- Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0));
- put(p1, CollectionUtils.createImmutableList(file1P1C0));
- }
- })
- );
- metaClient.getActiveTimeline().saveAsComplete(
- new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, commitTs),
-
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ if (isComplete) {
+ HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
+ Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ })
+ );
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
commitTs),
+
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+ return new HoodieInstant(
+ isComplete ? State.COMPLETED : State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, commitTs);
}
protected static HoodieCommitMetadata generateCommitMetadata(
@@ -1669,13 +1688,22 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
}
private Pair<List<HoodieInstant>, List<HoodieInstant>>
archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {
+ return archiveAndGetCommitsList(writeConfig, false);
+ }
+
+ private Pair<List<HoodieInstant>, List<HoodieInstant>>
archiveAndGetCommitsList(
+ HoodieWriteConfig writeConfig, boolean includeIncompleteInstants) throws
IOException {
metaClient.reloadActiveTimeline();
- HoodieTimeline timeline =
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
+ HoodieTimeline timeline = includeIncompleteInstants
+ ? metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
+ :
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants();
HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig,
table);
archiver.archiveIfRequired(context);
- timeline =
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
+ timeline = includeIncompleteInstants
+ ? metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
+ :
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commitsAfterArchival = timeline.getInstants();
return Pair.of(originalCommits, commitsAfterArchival);
}
@@ -1762,4 +1790,15 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
}
return new HoodieInstant(inflight, "rollback", rollbackTime);
}
+
+ private void assertInstantListEquals(List<HoodieInstant> expected,
List<HoodieInstant> actual) {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ HoodieInstant expectedInstant = expected.get(i);
+ HoodieInstant actualInstant = actual.get(i);
+ assertEquals(expectedInstant.getTimestamp(),
actualInstant.getTimestamp());
+ assertEquals(expectedInstant.getAction(), actualInstant.getAction());
+ assertEquals(expectedInstant.getState(), actualInstant.getState());
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 7d29c176c1b..2f57f5455c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -24,8 +24,11 @@ import
org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.model.CleanFileInfo;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -38,6 +41,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,9 +110,43 @@ public class CleanerUtils {
return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
}
+ public static Option<HoodieInstant> getEarliestCommitToRetain(
+ HoodieTimeline commitsTimeline, HoodieCleaningPolicy cleaningPolicy, int
commitsRetained,
+ Instant latestInstant, int hoursRetained, HoodieTimelineTimeZone
timeZone) {
+ HoodieTimeline completedCommitsTimeline =
commitsTimeline.filterCompletedInstants();
+ Option<HoodieInstant> earliestCommitToRetain = Option.empty();
+
+ if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
+ && completedCommitsTimeline.countInstants() > commitsRetained) {
+ Option<HoodieInstant> earliestPendingCommits =
+ commitsTimeline.filter(s -> !s.isCompleted()).firstInstant();
+ if (earliestPendingCommits.isPresent()) {
+ // Earliest commit to retain must not be later than the earliest
pending commit
+ earliestCommitToRetain =
+
completedCommitsTimeline.nthInstant(completedCommitsTimeline.countInstants() -
commitsRetained).map(nthInstant -> {
+ if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+ return Option.of(nthInstant);
+ } else {
+ return
completedCommitsTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+ }
+ }).orElse(Option.empty());
+ } else {
+ earliestCommitToRetain =
completedCommitsTimeline.nthInstant(completedCommitsTimeline.countInstants()
+ - commitsRetained); //15 instants total, 10 commits to retain,
this gives 6th instant in the list
+ }
+ } else if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+ ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(latestInstant,
timeZone.getZoneId());
+ String earliestTimeToRetain =
HoodieActiveTimeline.formatDate(Date.from(latestDateTime.minusHours(hoursRetained).toInstant()));
+ earliestCommitToRetain =
Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i
-> HoodieTimeline.compareTimestamps(i.getTimestamp(),
+ HoodieTimeline.GREATER_THAN_OR_EQUALS,
earliestTimeToRetain)).findFirst());
+ }
+ return earliestCommitToRetain;
+ }
+
/**
* Get Latest version of cleaner plan corresponding to a clean instant.
- * @param metaClient Hoodie Table Meta Client
+ *
+ * @param metaClient Hoodie Table Meta Client
* @param cleanInstant Instant referring to clean action
* @return Cleaner plan corresponding to clean instant
* @throws IOException