Zouxxyy commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1324756063


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -388,60 +387,40 @@ private Map<String, Boolean> 
deleteFilesParallelize(HoodieTableMetaClient metaCl
         paths);
   }
 
-  private Stream<HoodieInstant> getCleanInstantsToArchive() {
-    HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
-        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
-    return cleanAndRollbackTimeline.getInstantsAsStream()
-        
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
-        .map(hoodieInstants -> {
-          if (hoodieInstants.size() > this.maxInstantsToKeep) {
-            return hoodieInstants.subList(0, hoodieInstants.size() - 
this.minInstantsToKeep);
-          } else {
-            return new ArrayList<HoodieInstant>();
-          }
-        }).flatMap(Collection::stream);
-  }
-
-  private Stream<HoodieInstant> getCommitInstantsToArchive() throws 
IOException {
-    // TODO (na) : Add a way to return actions associated with a timeline and 
then merge/unify
-    // with logic above to avoid Stream.concat
-    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
-
-    // Get the oldest inflight instant and a completed commit before this 
inflight instant.
-    Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
-        .getWriteTimeline()
-        .filter(instant -> !instant.isCompleted())
-        .firstInstant();
-
-    // Oldest commit to retain is the greatest completed commit, that is less 
than the oldest pending instant.
-    // In some cases when inflight is the lowest commit then oldest commit to 
retain will be equal to oldest
-    // inflight commit.
-    Option<HoodieInstant> oldestCommitToRetain;
-    if (oldestPendingInstant.isPresent()) {
-      Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
-          Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
-              .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
-                  LESSER_THAN, 
oldestPendingInstant.get().getTimestamp())).findFirst());
-      // Check if the completed instant is higher than the oldest inflight 
instant
-      // in that case update the oldestCommitToRetain to oldestInflight commit 
time.
-      if (!completedCommitBeforeOldestPendingInstant.isPresent()
-          || 
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
-          LESSER_THAN, 
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
-        oldestCommitToRetain = oldestPendingInstant;
+  private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+    HoodieTimeline completedCommitsTimeline = 
table.getCompletedCommitsTimeline();
+
+    if (completedCommitsTimeline.countInstants() > maxInstantsToKeep) {
+      // Step1: Get all candidates of oldestInstantToRetain.
+      List<Option<HoodieInstant>> oldestInstantToRetainCandidates = new 
ArrayList<>();
+

Review Comment:
   In fact, I made two refactoring here:
   
   1. Move the filter logic in the original `getInstantsToArchive` to 
`getCommitInstantsToArchive` (see points 4. If metadata table is enabled... and 
5. If this is a metadata table...)
   2. Since there are 5 logics to obtain `oldestInstantToRetainCandidates` now, 
and they are all as the same boundary to pick instants, it is natural to take 
their minimum value.
   
   The first one is necessary, because the previous logic was wrong, all filter 
for commit instants should be put together; and the second one is natural I 
think, otherwise we have to write 5 filters like this:
   ```java
             }).filter(s -> {
               // oldestCommitToRetain is the highest completed commit instant 
that is less than the oldest inflight instant.
               // By filtering out any commit >= oldestCommitToRetain, we can 
ensure there are no gaps in the timeline
               // when inflight commits are present.
               return oldestCommitToRetain
                   .map(instant -> compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
                   .orElse(true);
             }).filter(s ->
                 oldestInstantToRetainForCompaction.map(instantToRetain ->
                         compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantToRetain.getTimestamp()))
                     .orElse(true)
             ).filter(s ->
                 oldestInstantToRetainForClustering.map(instantToRetain ->
                         HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
                     .orElse(true)
             );
   ```
   There are no logical changes in the above changes, and I have carefully 
checked them, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to