boneanxs commented on code in PR #8876:
URL: https://github.com/apache/hudi/pull/8876#discussion_r1217398502
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -295,4 +296,27 @@ public static Option<HoodieInstant>
getEarliestInstantForMetadataArchival(
return Option.empty();
}
}
+
+ /**
+ * Find the earliest incomplete commit, deltacommit, or non-clustering
replacecommit,
+ * so that the incremental pulls should be strictly before this instant.
+ * This is to guard around multi-writer scenarios where a commit starting
later than
+ * another commit from a concurrent writer can finish earlier, leaving an
inflight commit
+ * before a completed commit.
+ */
+ public static HoodieTimeline filterTimelineForIncrementalQueryIfNeeded(
+ HoodieTableMetaClient metaClient, HoodieTimeline
completedCommitTimeline, boolean useStateTransitionTime) {
+ if (useStateTransitionTime) {
+ return completedCommitTimeline;
+ }
+ Option<HoodieInstant> firstIncompleteCommit =
metaClient.getCommitsTimeline()
+ .filterInflightsAndRequested()
+ .filter(instant ->
+ !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+ || !ClusteringUtils.getClusteringPlan(metaClient,
instant).isPresent())
Review Comment:
`!ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()` means
the replace instant is inflight?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala:
##########
@@ -60,11 +60,14 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
override protected def timeline: HoodieTimeline = {
if (fullTableScan) {
- metaClient.getCommitsAndCompactionTimeline
+ filterTimelineForIncrementalQueryIfNeeded(metaClient,
Review Comment:
If using `fullTableScan`, change timeline could also change `queryTimestamp`
in `HoodieBaseRelation`, and this could affect fetching latest FileSlices in
`listLatestFileSlices`.
I think we can still use `metaClient.getCommitsAndCompactionTimeline` for
`fullTableScan`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -295,4 +296,27 @@ public static Option<HoodieInstant>
getEarliestInstantForMetadataArchival(
return Option.empty();
}
}
+
+ /**
+ * Find the earliest incomplete commit, deltacommit, or non-clustering
replacecommit,
+ * so that the incremental pulls should be strictly before this instant.
+ * This is to guard around multi-writer scenarios where a commit starting
later than
+ * another commit from a concurrent writer can finish earlier, leaving an
inflight commit
+ * before a completed commit.
+ */
+ public static HoodieTimeline filterTimelineForIncrementalQueryIfNeeded(
+ HoodieTableMetaClient metaClient, HoodieTimeline
completedCommitTimeline, boolean useStateTransitionTime) {
+ if (useStateTransitionTime) {
+ return completedCommitTimeline;
+ }
+ Option<HoodieInstant> firstIncompleteCommit =
metaClient.getCommitsTimeline()
+ .filterInflightsAndRequested()
+ .filter(instant ->
+ !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+ || !ClusteringUtils.getClusteringPlan(metaClient,
instant).isPresent())
+ .firstInstant();
+ return firstIncompleteCommit.map(
Review Comment:
Just want to clear here that it could block downstream incremental jobs if
clustering job/one writer takes too much time to finish(Or failed and the owner
doesn't fix it in time, it could happens especially for clustering since it
usually is lower priority). Now the owner need to take more care for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]