bvaradar commented on a change in pull request #2388:
URL: https://github.com/apache/hudi/pull/2388#discussion_r581902402



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
##########
@@ -57,35 +57,38 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
       List<HoodieInstant> newInstants = new ArrayList<>();
 
       // Check If any pending compaction is lost. If so, do not allow 
incremental timeline sync
-      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = 
getPendingCompactionTransitions(oldT, newT);
-      List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
+      List<Pair<HoodieInstant, HoodieInstant>> viewChangingInstants = 
getPendingActionTransitions(oldT, newT);
+      List<HoodieInstant> lostPendingActions = viewChangingInstants.stream()
           .filter(instantPair -> instantPair.getValue() == 
null).map(Pair::getKey).collect(Collectors.toList());
-      if (!lostPendingCompactions.isEmpty()) {
-        // If a compaction is unscheduled, fall back to complete refresh of fs 
view since some log files could have been
+      if (!lostPendingActions.isEmpty()) {
+        // If a compaction/clustering is unscheduled, fall back to complete 
refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline 
(unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending view changing instants are no longer in new 
timeline (unscheduled ?). They are :"
+            + lostPendingActions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
-      List<HoodieInstant> finishedCompactionInstants = 
compactionInstants.stream()
-          .filter(instantPair -> 
instantPair.getValue().getAction().equals(HoodieTimeline.COMMIT_ACTION)
-              && instantPair.getValue().isCompleted())
+      List<HoodieInstant> finishedViewChangingInstants = 
viewChangingInstants.stream()

Review comment:
       Can you construct a timeline and call timeline.viewAlteringInstants() 
instead to avoid duplicating the logic ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
##########
@@ -57,35 +57,38 @@ public static TimelineDiffResult 
getNewInstantsForIncrementalSync(HoodieTimeline
       List<HoodieInstant> newInstants = new ArrayList<>();
 
       // Check If any pending compaction is lost. If so, do not allow 
incremental timeline sync
-      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = 
getPendingCompactionTransitions(oldT, newT);
-      List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
+      List<Pair<HoodieInstant, HoodieInstant>> viewChangingInstants = 
getPendingActionTransitions(oldT, newT);
+      List<HoodieInstant> lostPendingActions = viewChangingInstants.stream()
           .filter(instantPair -> instantPair.getValue() == 
null).map(Pair::getKey).collect(Collectors.toList());
-      if (!lostPendingCompactions.isEmpty()) {
-        // If a compaction is unscheduled, fall back to complete refresh of fs 
view since some log files could have been
+      if (!lostPendingActions.isEmpty()) {
+        // If a compaction/clustering is unscheduled, fall back to complete 
refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline 
(unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending view changing instants are no longer in new 
timeline (unscheduled ?). They are :"
+            + lostPendingActions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
-      List<HoodieInstant> finishedCompactionInstants = 
compactionInstants.stream()
-          .filter(instantPair -> 
instantPair.getValue().getAction().equals(HoodieTimeline.COMMIT_ACTION)
-              && instantPair.getValue().isCompleted())
+      List<HoodieInstant> finishedViewChangingInstants = 
viewChangingInstants.stream()
+          .filter(instantPair -> instantPair.getValue().isCompleted()
+              && 
(HoodieTimeline.COMMIT_ACTION.equals(instantPair.getValue().getAction())
+              || 
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instantPair.getValue().getAction())))
           .map(Pair::getKey).collect(Collectors.toList());
 
       newT.getInstants().filter(instant -> 
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, 
true);
+      return new TimelineDiffResult(newInstants, finishedViewChangingInstants, 
true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
-  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingCompactionTransitions(HoodieTimeline oldTimeline,
-      HoodieTimeline newTimeline) {
+  private static List<Pair<HoodieInstant, HoodieInstant>> 
getPendingActionTransitions(HoodieTimeline oldTimeline,
+                                                                               
       HoodieTimeline newTimeline) {
     Set<HoodieInstant> newTimelineInstants = 
newTimeline.getInstants().collect(Collectors.toSet());
+    
+    List<Pair<HoodieInstant, HoodieInstant>> allTransitions = new 
ArrayList<>();
 
-    return 
oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant -> {
+    
allTransitions.addAll(oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant
 -> {

Review comment:
       Instead of oldTimeline.filterPendingCompactionTimeline().getInstants(),
   is there scope to use 
   oldTimeline.viewChangingInstants() and consolidate both this and below 
statement  where we handle replace commits ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
##########
@@ -223,7 +223,7 @@ void 
resetFileGroupsInPendingClustering(Map<HoodieFileGroupId, HoodieInstant> fg
   @Override
   void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, 
HoodieInstant>> fileGroups) {
     fileGroups.forEach(fileGroupInstantPair -> {
-      
ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
+      
ValidationUtils.checkArgument(!fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),

Review comment:
       This is a bug in 0.7 right which will fail when RocksDBFileSystemView is 
used ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -108,17 +110,22 @@ private void runIncrementalSync(HoodieTimeline timeline, 
TimelineDiffResult diff
     LOG.info("Timeline Diff Result is :" + diffResult);
 
     // First remove pending compaction instants which were completed
-    diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
+    diffResult.getFinishedViewChangingInstants().stream().forEach(instant -> {
       try {
-        removePendingCompactionInstant(timeline, instant);
+        if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {

Review comment:
       Can we introduce something like HoodieInstant.isAction(String action) 
instead of directly checking the action names here ? There could be many such 
occurrence like this ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -161,7 +161,7 @@ public RemoteHoodieTableFileSystemView(String server, int 
port, HoodieTableMetaC
 
     // Adding mandatory parameters - Last instants affecting file-slice
     timeline.lastInstant().ifPresent(instant -> 
builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
-    builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
+    builder.addParameter(TIMELINE_HASH, 
timeline.filterCompletedAndCompactionInstants().getTimelineHash());

Review comment:
       Shouldn't this be filterViewChangingInstants ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
##########
@@ -77,7 +77,7 @@ public HoodieFileGroup(HoodieFileGroupId fileGroupId, 
HoodieTimeline timeline) {
     this.fileGroupId = fileGroupId;
     this.fileSlices = new 
TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator());
     this.timeline = timeline;
-    this.lastInstant = timeline.lastInstant();
+    this.lastInstant = 
timeline.filterCompletedAndCompactionInstants().lastInstant();

Review comment:
       We should avoid this. FileGroup should just be acting on the timeline 
given to make them composable.  Can you elaborate on why is there a need to 
ensure lastInstant must include only completed instants ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to