wombatu-kun commented on code in PR #19005:
URL: https://github.com/apache/hudi/pull/19005#discussion_r3434866192
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override def shouldIncludeLogFiles(): Boolean = fullTableScan
+ /**
+ * Collects the incremental file slices for the given modified partitions.
+ *
+ * Each returned file slice carries its base file plus all of its log files
so the
+ * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a
correct runtime merge.
+ * Resolving the current value of a record changed in the incremental window
requires the full file
+ * slice, not just the files written within the window:
+ * - EVENT_TIME_ORDERING: a record written in the window may carry a lower
ordering value than the
+ * version already in the base/earlier-log, so the existing version wins;
a window-only view
+ * cannot determine the winner.
+ * - Partial updates: a window log block holds only the changed columns, so
the unchanged columns
+ * must be filled in from the base file.
+ * (HUDI #18943.)
+ *
+ * The view is built from the (modified) partition listing (which includes
the latest base file of
+ * each file group, honoring the metadata table when enabled) and scoped
back to the file groups
+ * actually touched in the window (see [[affectedFileGroupIds]]) so
untouched file groups are not
+ * read. The commit-time record filter applied during the scan still
restricts the returned records
+ * to the incremental window.
+ *
+ * The view timeline is bounded to instants at or before `latestCommit` (the
window's last commit)
+ * so that base/log files written by later commits are not visible. Without
this bound a record
+ * updated again after the window would be merged with those later log files
and its merged commit
+ * time would fall outside the window, dropping the in-window change from
the result.
+ */
+ private def collectIncrementalFileSlices(partitionPaths: Seq[String],
latestCommit: String): Seq[FileSlice] = {
+ val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sqlContext.sparkContext))
+ val fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+ engineContext, metaClient, fileIndex.metadataConfig,
timeline.findInstantsBeforeOrEquals(latestCommit))
+ try {
+ partitionPaths.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ .filter(fs => affectedFileGroupIds.contains(fs.getFileId))
+ }
+ } finally {
+ fsView.close()
+ }
+ }
+
+ /**
+ * Builds the incremental file slices directly from the files recorded in
the window's commits
+ * (`affectedFilesInCommits`), as the read path did before HUDI #18943.
+ *
+ * This is used only when [[hasMissingAffectedFiles]] is true and the
full-table-scan fallback is
+ * disabled: some files referenced by the window have been removed (e.g. by
cleaning), so there is
+ * no correct incremental result to produce. Listing from the recorded files
(which include the
+ * missing paths) preserves the prior fail-early contract - the scan
surfaces a file-not-found
+ * error pointing the user at
`hoodie.datasource.read.incr.fallback.fulltablescan.enable` - instead
+ * of silently returning an empty/partial result from a fresh listing that
no longer sees those
+ * files.
+ */
+ private def legacyAffectedFileSlices(partitionPaths: Seq[String],
latestCommit: String): Seq[FileSlice] = {
Review Comment:
`legacyAffectedFileSlices` and `collectIncrementalFileSlices` share the same
`partitionPaths.flatMap { p => view.getLatestMergedFileSlicesBeforeOrOn(p,
latestCommit).iterator().asScala [.filter(...)] }` body wrapped in `try/finally
view.close()`, differing only in how the view is built and whether the
file-group filter applies. Consider a private helper taking the view plus a
`FileSlice => Boolean` predicate that owns the `try/finally close`, so the two
call sites cannot drift on the close() contract.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override def shouldIncludeLogFiles(): Boolean = fullTableScan
+ /**
+ * Collects the incremental file slices for the given modified partitions.
+ *
+ * Each returned file slice carries its base file plus all of its log files
so the
+ * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a
correct runtime merge.
+ * Resolving the current value of a record changed in the incremental window
requires the full file
+ * slice, not just the files written within the window:
+ * - EVENT_TIME_ORDERING: a record written in the window may carry a lower
ordering value than the
+ * version already in the base/earlier-log, so the existing version wins;
a window-only view
+ * cannot determine the winner.
+ * - Partial updates: a window log block holds only the changed columns, so
the unchanged columns
+ * must be filled in from the base file.
+ * (HUDI #18943.)
+ *
+ * The view is built from the (modified) partition listing (which includes
the latest base file of
+ * each file group, honoring the metadata table when enabled) and scoped
back to the file groups
+ * actually touched in the window (see [[affectedFileGroupIds]]) so
untouched file groups are not
+ * read. The commit-time record filter applied during the scan still
restricts the returned records
+ * to the incremental window.
+ *
+ * The view timeline is bounded to instants at or before `latestCommit` (the
window's last commit)
+ * so that base/log files written by later commits are not visible. Without
this bound a record
+ * updated again after the window would be merged with those later log files
and its merged commit
+ * time would fall outside the window, dropping the in-window change from
the result.
+ */
Review Comment:
This `affectedFileGroupIds` filter is the only thing scoping the
metadata-aware partition view back to the touched file groups, but no test
covers a modified partition that holds an untouched sibling file group.
`testPartialUpdateIncrementalQueryPartitioned` writes one updated record per
partition, so with `MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT=0` each partition
is a single file group and this filter is a no-op - removing it would fail no
test. Suggest a case that lands multiple file groups in one partition and
updates only one, asserting the untouched sibling is excluded. Output stays
correct via the post-merge commit-time filter either way, so this guards the
scoping/perf path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]