lokeshj1703 commented on code in PR #11440:
URL: https://github.com/apache/hudi/pull/11440#discussion_r1681475904
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -376,9 +381,75 @@ public static Option<HoodieInstant>
getEarliestInstantToRetainForClustering(
oldestInstantToRetain = replaceOrClusterTimeline.firstInstant();
}
}
+ oldestInstantToRetain = Option.ofNullable(
+ HoodieTimeline.minTimestampInstant(oldestInstantToRetain.orElse(null),
earliestSavepointInClean.orElse(null)));
return oldestInstantToRetain;
}
+ public static Option<HoodieInstant>
getEarliestSavepointInClean(HoodieActiveTimeline activeTimeline,
HoodieTableMetaClient metaClient, HoodieCleaningPolicy cleanerPolicy,
+
Option<HoodieInstant> cleanInstantOpt, boolean shouldArchiveBeyondSavepoint)
throws IOException {
+ // EarliestSavepoint in clean is required to block archival when savepoint
is deleted.
+ // This ensures that archival is blocked until clean has cleaned up files
retained due to savepoint.
+ // If this guard is not present, the archival of commits can lead to
duplicates. Here is a scenario
+ // illustrating the same. This scenario considers a case where
EarliestSavepoint guard is not present
+ // c1.dc - f1 (c1 deltacommit creates file with id f1)
+ // c2.dc - f2 (c2 deltacommit creates file with id f2)
+ // c2.sp - Savepoint at c2
+ // c3.rc (replacing f2 -> f3) (Replace commit replacing file id f2 with f3)
+ // c4.dc
+ //
+ // Lets say Incremental cleaner moved past the c3.rc without cleaning f2
since savepoint is created at c2.
+ // Archival is blocked at c2 since there is a savepoint at c2.
+ // Lets say the savepoint at c2 is now deleted, Archival would archive
c3.rc since it is unblocked now.
+ // Since c3 is archived and f2 has not been cleaned, the table view would
be considering f2 as a valid
+ // file id. This causes duplicates.
+ if (shouldArchiveBeyondSavepoint) {
+ // When archive beyond savepoint is enabled, we do not block the
archival based on cleaner earliestSavepoint
+ return Option.empty();
+ }
+ // explicitly check the savepoint timeline and guard against the first one
+ Option<String> firstSavepointOpt =
activeTimeline.getSavePointTimeline().filterCompletedInstants().firstInstant().map(HoodieInstant::getTimestamp);
+ String earliestSavepointTs = firstSavepointOpt.orElse(null);
+ Option<String> cleanerEarliestSavepoint = Option.empty();
+ Option<HoodieActionInstant> cleanerEarliestToRetain = Option.empty();
+
+ if (cleanerPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS &&
cleanInstantOpt.isPresent()) {
Review Comment:
@nsivabalan Is specific handling for `KEEP_LATEST_FILE_VERSIONS` still
required or can we keep logic same for all cleaner policies? It seems like
SAVEPOINTED_TIMESTAMPS would always be created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]