suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264703653


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+                                                                               
              HoodieTimeline newTimeline) {
+    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+    return 
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant -> 
{
+      if (newTimelineInstants.contains(instant)) {

Review Comment:
   Refactored the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to