nsivabalan commented on a change in pull request #3646:
URL: https://github.com/apache/hudi/pull/3646#discussion_r790390998
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -535,6 +540,11 @@ public Builder retainCommits(int commitsRetained) {
return this;
}
+ public Builder retainNumberOfHours(int cleanerHoursRetained) {
Review comment:
cleanerNumHoursRetained
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -330,6 +349,19 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
}
return deletePaths;
}
+
+ /**
+ * This method finds the files to be cleaned based on the number of hours.
If {@code config.getCleanerHoursRetained()} is set to 5,
+ * all the files with commit time earlier than 5 hours will be removed. Also
the latest file for any file group is retained.
+ * This policy gives much more flexibility to users for retaining data for
running incremental queries as compared to
+ * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
+ * @param partitionPath partition path to check
+ * @return list of files to clean
+ */
+ private List<CleanFileInfo> getFilesToCleanKeepingLatestHours(String
partitionPath) {
+ int commitsToRetain = 0;
Review comment:
do we need this. if not, can we remove.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -76,6 +76,11 @@
.withDocumentation("Number of commits to retain, without cleaning. This
will be retained for num_of_commits * time_between_commits "
+ "(scheduled). This also directly translates into how much data
retention the table supports for incremental queries.");
+ public static final ConfigProperty<String> CLEANER_HOURS_RETAINED =
ConfigProperty.key("hoodie.cleaner.hours.retained")
+ .defaultValue("24")
+ .withDocumentation("Number of hours for which commits need to be
retained. This config provides a more flexible option as"
+ + "compared to number of commits retained for cleaning service");
Review comment:
I feel this policy is not standalone in itself. For eg,
numCommitsRetained and numFileVersionsRetained are self contained. But here, we
follow similar logic we do for retain num commits. I mean, we clean up commits
and not file versions. If you think about it, in addition to configuring hours
retained, probably users have to choose whether they want to clean up files
based on commits or based on file versions.
Let me know wdyt.
Either ways, we need to add more documentation here.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -1240,6 +1244,154 @@ public void testKeepLatestCommits(boolean
simulateFailureRetry, boolean enableIn
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
}
+ /**
+ * Test cleaning policy based on number of hours retained policy. This test
case covers the case when files will not be cleaned.
+ */
+ @ParameterizedTest
+ @MethodSource("argumentsForTestKeepLatestCommits")
+ public void testKeepXHoursNoCleaning(boolean simulateFailureRetry, boolean
enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
Review comment:
sg
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -402,9 +436,16 @@ private String
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
public Option<HoodieInstant> getEarliestCommitToRetain() {
Option<HoodieInstant> earliestCommitToRetain = Option.empty();
int commitsRetained = config.getCleanerCommitsRetained();
+ int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
- earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
+ earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
//15 instants total, 10 commits to retain, this gives 6th instant in the list
+ } else if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+ Instant instant = Instant.now();
+ ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
+ String retainInstantsAfter =
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusHours(hoursRetained).toInstant()));
Review comment:
retainInstantsAfter : may be we can rename this to earliestTimeToRetain.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -299,14 +308,24 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
// do not clean up a savepoint data file
continue;
}
- // Dont delete the latest commit and also the last commit before the
earliest commit we
- // are retaining
- // The window of commit retain == max query run time. So a query
could be running which
- // still
- // uses this file.
- if (fileCommitTime.equals(lastVersion) ||
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
- // move on to the next file
- continue;
+
+ if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+ // Dont delete the latest commit and also the last commit before
the earliest commit we
+ // are retaining
+ // The window of commit retain == max query run time. So a query
could be running which
+ // still
+ // uses this file.
+ if (fileCommitTime.equals(lastVersion) ||
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+ // move on to the next file
+ continue;
+ }
+ } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+ // This block corresponds to KEEP_LATEST_BY_HOURS policy
+ // Do not delete the latest commit.
+ if (fileCommitTime.equals(lastVersion)) {
Review comment:
Do we know why do have
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain) in L318. I
would prefer to keep the same logic unless we have strong reasons to remove. As
I mentioned, cleaning based on num hours is just a diff way to conveying retain
N commits. So, lets try to keep the logic similar across both. Easier to
maintain.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -1267,6 +1271,154 @@ public void testKeepLatestCommits(boolean
simulateFailureRetry, boolean enableIn
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
}
+ /**
+ * Test cleaning policy based on number of hours retained policy. This test
case covers the case when files will not be cleaned.
+ */
+ @ParameterizedTest
+ @MethodSource("argumentsForTestKeepLatestCommits")
+ public void testKeepXHoursNoCleaning(boolean simulateFailureRetry, boolean
enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withIncrementalCleaningMode(enableIncrementalClean)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).retainNumberOfHours(2).build())
+ .build();
+
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+ Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
+
+ String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p0).get(0).getFileId()
+ : UUID.randomUUID().toString();
+ String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p1).get(0).getFileId()
+ : UUID.randomUUID().toString();
+ Instant instant = Instant.now();
+ ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
+ int minutesForFirstCommit = 90;
+ String firstCommitTs =
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant()));
+ testTable.addInflightCommit(firstCommitTs).withBaseFilesInPartition(p0,
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+ HoodieCommitMetadata commitMetadata = generateCommitMetadata(
+ Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ })
+ );
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
firstCommitTs),
+
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config,
simulateFailureRetry);
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0));
+
+ // make next commit, with 1 insert & 1 update per partition
+ int minutesForSecondCommit = 40;
+ String secondCommitTs =
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant()));
+ Map<String, String> partitionAndFileId002 =
testTable.addInflightCommit(secondCommitTs).getFileIdsWithBaseFilesInPartitions(p0,
p1);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ String file2P1C1 = partitionAndFileId002.get(p1);
+ testTable.forCommit(secondCommitTs).withBaseFilesInPartition(p0,
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+ commitMetadata = generateCommitMetadata(new HashMap<String,
List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+ }
+ });
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
secondCommitTs),
+
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config,
simulateFailureRetry);
+ assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1));
+ assertTrue(testTable.baseFileExists(p0, secondCommitTs, file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, secondCommitTs, file1P1C0));
+ }
Review comment:
wondering do we need the next test. can't we add another commit here so
that first commit is eligible for cleaning and do some assertions.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -402,9 +436,16 @@ private String
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
public Option<HoodieInstant> getEarliestCommitToRetain() {
Option<HoodieInstant> earliestCommitToRetain = Option.empty();
int commitsRetained = config.getCleanerCommitsRetained();
+ int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
- earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
+ earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
//15 instants total, 10 commits to retain, this gives 6th instant in the list
+ } else if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+ Instant instant = Instant.now();
+ ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
Review comment:
commitDateTime: may be we can rename to currentDateTime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]