SteNicholas commented on code in PR #7891:
URL: https://github.com/apache/hudi/pull/7891#discussion_r1119934910


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -473,6 +473,33 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() 
throws IOException {
                       HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
           );
+
+      // When inline or async clustering is enabled, we need to ensure that 
there is a commit in the active timeline
+      // to check whether the file slice generated in pending clustering after 
archive isn't committed
+      // via {@code HoodieFileGroup#isFileSliceCommitted(slice)}
+      boolean isOldestPendingReplaceInstant =
+          oldestPendingCompactionAndReplaceInstant.map(instant ->
+              
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())).orElse(false);
+      if (isOldestPendingReplaceInstant) {
+        List<HoodieInstant> instantsToArchive = 
instantToArchiveStream.collect(Collectors.toList());
+        Option<HoodieInstant> latestInstantRetainForReplace = 
Option.fromJavaOptional(
+            instantsToArchive.stream()
+                .filter(s -> HoodieTimeline.compareTimestamps(
+                    s.getTimestamp(),
+                    LESSER_THAN,
+                    
oldestPendingCompactionAndReplaceInstant.get().getTimestamp()))
+                .reduce((i1, i2) -> i2));
+        if (latestInstantRetainForReplace.isPresent()) {
+          LOG.info(String.format(
+              "Retaining the archived instant %s before the inflight 
replacecommit %s.",
+              latestInstantRetainForReplace.get().getTimestamp(),
+              oldestPendingCompactionAndReplaceInstant.get().getTimestamp()));
+        }
+        instantToArchiveStream = instantsToArchive.stream()
+            .filter(s -> latestInstantRetainForReplace.map(instant -> 
s.compareTo(instant) != 0)
+                .orElse(true));
+      }
+

Review Comment:
   @zhuanshenbsj1, I don't think that the changes could move into 
`ClusteringUtils#getOldestInstantToRetainForClustering`. Because the 
`instantsToArchive` only retains one commit before the oldest clustering 
instant, and other commits could be achived instead of retained.
   
   Gives an timeline example to explain above idea: ie. timeline: commit1, 
commit2, replacecommit1.inflight, commit3, commit4. In this timeline, only 
commit2 should not archive to verify whether replacecommit1 is completed, other 
commits including commit1, commit3, commit4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to