yihua commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1328020402
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java:
##########
@@ -281,16 +243,62 @@ private Stream<ActiveAction> getInstantsToArchive()
throws IOException {
// CLEAN or ROLLBACK instant can block the archive of metadata table
timeline and causes
// the active timeline of metadata table to be extremely long, leading
to performance issues
// for loading the timeline.
- if (qualifiedEarliestInstant.isPresent()) {
- instants = instants.filter(instant ->
- compareTimestamps(
- instant.getTimestamp(),
- HoodieTimeline.LESSER_THAN,
- qualifiedEarliestInstant.get().getTimestamp()));
- }
+ earliestInstantToRetainCandidates.add(qualifiedEarliestInstant);
}
- return instants.map(hoodieInstant -> {
+ // Choose the instant in earliestInstantToRetainCandidates with the
smallest
+ // timestamp as earliestInstantToRetain.
+ java.util.Optional<HoodieInstant> earliestInstantToRetain =
earliestInstantToRetainCandidates
+ .stream()
+ .filter(Option::isPresent)
+ .map(Option::get)
+ .min(HoodieInstant.COMPARATOR);
+
+ // Step2: We cannot archive any commits which are made after the first
savepoint present,
+ // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+ Option<HoodieInstant> firstSavepoint =
table.getCompletedSavepointTimeline().firstInstant();
+ Set<String> savepointTimestamps = table.getSavepointTimestamps();
+
+ Stream<HoodieInstant> instantToArchiveStream =
completedCommitsTimeline.getInstantsAsStream()
+ .filter(s -> {
+ if (config.shouldArchiveBeyondSavepoint()) {
+ // skip savepoint commits and proceed further
+ return !savepointTimestamps.contains(s.getTimestamp());
+ } else {
+ // if no savepoint present, then don't filter
+ // stop at first savepoint commit
+ return !firstSavepoint.isPresent() ||
compareTimestamps(s.getTimestamp(), LESSER_THAN,
firstSavepoint.get().getTimestamp());
+ }
+ }).filter(s -> earliestInstantToRetain
+ .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN,
instant.getTimestamp()))
+ .orElse(true));
+ return
instantToArchiveStream.limit(completedCommitsTimeline.countInstants() -
minInstantsToKeep)
+ .collect(Collectors.toList());
+ }
+
+ private Stream<ActiveAction> getInstantsToArchive() throws IOException {
+ if (config.isMetaserverEnabled()) {
+ return Stream.empty();
+ }
+
+ // First get commit instants to archive.
+ List<HoodieInstant> instantsToArchive = getCommitInstantsToArchive();
+ if (!instantsToArchive.isEmpty()) {
+ HoodieInstant latestCommitInstantToArchive =
instantsToArchive.get(instantsToArchive.size() - 1);
+ // Then get clean and rollback instants to archive.
+ List<HoodieInstant> cleanAndRollbackInstantsToArchive =
+ getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive);
+ instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);
+ instantsToArchive.sort(HoodieInstant.COMPARATOR);
+ }
Review Comment:
nit: I think the logic can be further simplified by treating all instants to
archive the same, i.e., getting all instants before the earliest commit to
retain, without differentiating the action types. We can follow up in a
separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]