boneanxs commented on code in PR #8876:
URL: https://github.com/apache/hudi/pull/8876#discussion_r1217398502


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -295,4 +296,27 @@ public static Option<HoodieInstant> 
getEarliestInstantForMetadataArchival(
       return Option.empty();
     }
   }
+
+  /**
+   * Find the earliest incomplete commit, deltacommit, or non-clustering 
replacecommit,
+   * so that the incremental pulls should be strictly before this instant.
+   * This is to guard around multi-writer scenarios where a commit starting 
later than
+   * another commit from a concurrent writer can finish earlier, leaving an 
inflight commit
+   * before a completed commit.
+   */
+  public static HoodieTimeline filterTimelineForIncrementalQueryIfNeeded(
+      HoodieTableMetaClient metaClient, HoodieTimeline 
completedCommitTimeline, boolean useStateTransitionTime) {
+    if (useStateTransitionTime) {
+      return completedCommitTimeline;
+    }
+    Option<HoodieInstant> firstIncompleteCommit = 
metaClient.getCommitsTimeline()
+        .filterInflightsAndRequested()
+        .filter(instant ->
+            !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+                || !ClusteringUtils.getClusteringPlan(metaClient, 
instant).isPresent())

Review Comment:
   `!ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()` means 
the replace instant is inflight?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -60,11 +60,14 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
 
   override protected def timeline: HoodieTimeline = {
     if (fullTableScan) {
-      metaClient.getCommitsAndCompactionTimeline
+      filterTimelineForIncrementalQueryIfNeeded(metaClient,

Review Comment:
   If using `fullTableScan`, change timeline could also change `queryTimestamp` 
in `HoodieBaseRelation`, and this could affect fetching latest FileSlices in 
`listLatestFileSlices`.
   
   I think we can still use `metaClient.getCommitsAndCompactionTimeline` for 
`fullTableScan`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -295,4 +296,27 @@ public static Option<HoodieInstant> 
getEarliestInstantForMetadataArchival(
       return Option.empty();
     }
   }
+
+  /**
+   * Find the earliest incomplete commit, deltacommit, or non-clustering 
replacecommit,
+   * so that the incremental pulls should be strictly before this instant.
+   * This is to guard around multi-writer scenarios where a commit starting 
later than
+   * another commit from a concurrent writer can finish earlier, leaving an 
inflight commit
+   * before a completed commit.
+   */
+  public static HoodieTimeline filterTimelineForIncrementalQueryIfNeeded(
+      HoodieTableMetaClient metaClient, HoodieTimeline 
completedCommitTimeline, boolean useStateTransitionTime) {
+    if (useStateTransitionTime) {
+      return completedCommitTimeline;
+    }
+    Option<HoodieInstant> firstIncompleteCommit = 
metaClient.getCommitsTimeline()
+        .filterInflightsAndRequested()
+        .filter(instant ->
+            !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+                || !ClusteringUtils.getClusteringPlan(metaClient, 
instant).isPresent())
+        .firstInstant();
+    return firstIncompleteCommit.map(

Review Comment:
   Just want to clear here that it could block downstream incremental jobs if 
clustering job/one writer takes too much time to finish(Or failed and the owner 
doesn't fix it in time, it could happens especially for clustering since it 
usually is lower priority). Now the owner need to take more care for it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to