suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1244870841


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -40,8 +40,8 @@ public class TimelineDiffHelper {
   public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
       HoodieTimeline newTimeline) {
 
-    HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
-    HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
+    HoodieTimeline oldT = 
oldTimeline.filterCompletedInstantsOrRewriteTimeline();
+    HoodieTimeline newT = 
newTimeline.filterCompletedInstantsOrRewriteTimeline();

Review Comment:
   Yeah makes sense, renaming the API.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -118,6 +118,8 @@ public class HoodieTableMetaClient implements Serializable {
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
   private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
   protected HoodieMetaserverConfig metaserverConfig;
+  // Tracks the status of the last incremental file sync
+  private boolean isLastIncrementalSyncSuccessful;
 

Review Comment:
   I have created this for testing purposes, I will remove this after 
addressing all the unit test failures.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // Check for replace commits completed or removed.
+      List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants = 
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+          .filter(instantPair -> !instantPair.getKey().isCompleted()
+              && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))
+          .collect(Collectors.toList());
+
+      // New instants will contains instants that are newly completed commits 
or newly created pending rewrite commits
+      // (i.e. compaction, logcompaciton, replacecommit)
+      // Finished or removed rewrite commits are handled independently.
+      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants,
+          finishedOrRemovedReplaceCommitInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
+  /**
+   * Get pending replacecommit transitions.
+   */
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+                                                                               
              HoodieTimeline newTimeline) {
+    Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+    return 
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant -> 
{
+      if (newTimelineInstants.contains(instant)) {

Review Comment:
   Yeah, I tried to avoid refactoring in this PR because the logic in itself is 
complex. So, we can have a followup task to address the refactoring.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
 
       newTimeline.getInstantsAsStream().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
 
+      // Check for log compaction commits completed or removed.
       List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = 
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
-      List<HoodieInstant> finishedOrRemovedLogCompactionInstants = 
logCompactionInstants.stream()
+      List<Pair<HoodieInstant, Boolean>> 
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
           .filter(instantPair -> !instantPair.getKey().isCompleted()
               && (instantPair.getValue() == null || 
instantPair.getValue().isCompleted()))
-          .map(Pair::getKey).collect(Collectors.toList());
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
finishedOrRemovedLogCompactionInstants, true);
+          .map(instantPair -> (instantPair.getValue() == null)
+              ? Pair.of(instantPair.getKey(), false) : 
Pair.of(instantPair.getKey(), true))

Review Comment:
   Compaction plans are immutable plans, so once they are created even though 
the execution of the plan is not completed, new log files are still written to 
new file slice. Whereas Log compaction and clustering can be immutable, they 
can be removed so the logic varies.
   Compaction plans are removed only during a restore operation, so it is ok to 
do incremental file sync such cases, whereas for other 2 action types, 
incremental file sync should be able to handle the transitions from inflight to 
completed or inflight to rollback etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to