yihua commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1328020402


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java:
##########
@@ -281,16 +243,62 @@ private Stream<ActiveAction> getInstantsToArchive() 
throws IOException {
       // CLEAN or ROLLBACK instant can block the archive of metadata table 
timeline and causes
       // the active timeline of metadata table to be extremely long, leading 
to performance issues
       // for loading the timeline.
-      if (qualifiedEarliestInstant.isPresent()) {
-        instants = instants.filter(instant ->
-            compareTimestamps(
-                instant.getTimestamp(),
-                HoodieTimeline.LESSER_THAN,
-                qualifiedEarliestInstant.get().getTimestamp()));
-      }
+      earliestInstantToRetainCandidates.add(qualifiedEarliestInstant);
     }
 
-    return instants.map(hoodieInstant -> {
+    // Choose the instant in earliestInstantToRetainCandidates with the 
smallest
+    // timestamp as earliestInstantToRetain.
+    java.util.Optional<HoodieInstant> earliestInstantToRetain = 
earliestInstantToRetainCandidates
+        .stream()
+        .filter(Option::isPresent)
+        .map(Option::get)
+        .min(HoodieInstant.COMPARATOR);
+
+    // Step2: We cannot archive any commits which are made after the first 
savepoint present,
+    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+    Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
+    Set<String> savepointTimestamps = table.getSavepointTimestamps();
+
+    Stream<HoodieInstant> instantToArchiveStream = 
completedCommitsTimeline.getInstantsAsStream()
+        .filter(s -> {
+          if (config.shouldArchiveBeyondSavepoint()) {
+            // skip savepoint commits and proceed further
+            return !savepointTimestamps.contains(s.getTimestamp());
+          } else {
+            // if no savepoint present, then don't filter
+            // stop at first savepoint commit
+            return !firstSavepoint.isPresent() || 
compareTimestamps(s.getTimestamp(), LESSER_THAN, 
firstSavepoint.get().getTimestamp());
+          }
+        }).filter(s -> earliestInstantToRetain
+            .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instant.getTimestamp()))
+            .orElse(true));
+    return 
instantToArchiveStream.limit(completedCommitsTimeline.countInstants() - 
minInstantsToKeep)
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ActiveAction> getInstantsToArchive() throws IOException {
+    if (config.isMetaserverEnabled()) {
+      return Stream.empty();
+    }
+
+    // First get commit instants to archive.
+    List<HoodieInstant> instantsToArchive = getCommitInstantsToArchive();
+    if (!instantsToArchive.isEmpty()) {
+      HoodieInstant latestCommitInstantToArchive = 
instantsToArchive.get(instantsToArchive.size() - 1);
+      // Then get clean and rollback instants to archive.
+      List<HoodieInstant> cleanAndRollbackInstantsToArchive =
+          getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive);
+      instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);
+      instantsToArchive.sort(HoodieInstant.COMPARATOR);
+    }

Review Comment:
   nit: I think the logic can be further simplified by treating all instants to 
archive the same, i.e., getting all instants before the earliest commit to 
retain, without differentiating the action types.  We can follow up in a 
separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to