SteNicholas commented on code in PR #7891:
URL: https://github.com/apache/hudi/pull/7891#discussion_r1119934910


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -473,6 +473,33 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() 
throws IOException {
                       HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
           );
+
+      // When inline or async clustering is enabled, we need to ensure that 
there is a commit in the active timeline
+      // to check whether the file slice generated in pending clustering after 
archive isn't committed
+      // via {@code HoodieFileGroup#isFileSliceCommitted(slice)}
+      boolean isOldestPendingReplaceInstant =
+          oldestPendingCompactionAndReplaceInstant.map(instant ->
+              
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())).orElse(false);
+      if (isOldestPendingReplaceInstant) {
+        List<HoodieInstant> instantsToArchive = 
instantToArchiveStream.collect(Collectors.toList());
+        Option<HoodieInstant> latestInstantRetainForReplace = 
Option.fromJavaOptional(
+            instantsToArchive.stream()
+                .filter(s -> HoodieTimeline.compareTimestamps(
+                    s.getTimestamp(),
+                    LESSER_THAN,
+                    
oldestPendingCompactionAndReplaceInstant.get().getTimestamp()))
+                .reduce((i1, i2) -> i2));
+        if (latestInstantRetainForReplace.isPresent()) {
+          LOG.info(String.format(
+              "Retaining the archived instant %s before the inflight 
replacecommit %s.",
+              latestInstantRetainForReplace.get().getTimestamp(),
+              oldestPendingCompactionAndReplaceInstant.get().getTimestamp()));
+        }
+        instantToArchiveStream = instantsToArchive.stream()
+            .filter(s -> latestInstantRetainForReplace.map(instant -> 
s.compareTo(instant) != 0)
+                .orElse(true));
+      }
+

Review Comment:
   @zhuanshenbsj1, I don't think that the changes could move into 
`ClusteringUtils#getOldestInstantToRetainForClustering`. Because the 
`instantsToArchive` only retains one commit before the oldest clustering 
instant, and other commits could be achived instead of retained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to