wombatu-kun commented on code in PR #19005:
URL: https://github.com/apache/hudi/pull/19005#discussion_r3434867783
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala:
##########
@@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override def shouldIncludeLogFiles(): Boolean = fullTableScan
+ /**
+ * Collects the incremental file slices for the given modified partitions.
+ *
+ * Each returned file slice carries its base file plus all of its log files
so the
+ * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a
correct runtime merge.
+ * Resolving the current value of a record changed in the incremental window
requires the full file
+ * slice, not just the files written within the window:
+ * - EVENT_TIME_ORDERING: a record written in the window may carry a lower
ordering value than the
+ * version already in the base/earlier-log, so the existing version wins;
a window-only view
+ * cannot determine the winner.
+ * - Partial updates: a window log block holds only the changed columns, so
the unchanged columns
+ * must be filled in from the base file.
+ * (HUDI #18943.)
+ *
+ * The view is built from the (modified) partition listing (which includes
the latest base file of
+ * each file group, honoring the metadata table when enabled) and scoped
back to the file groups
+ * actually touched in the window (see [[affectedFileGroupIds]]) so
untouched file groups are not
+ * read. The commit-time record filter applied during the scan still
restricts the returned records
+ * to the incremental window.
+ *
+ * The view timeline is bounded to instants at or before `latestCommit` (the
window's last commit)
+ * so that base/log files written by later commits are not visible. Without
this bound a record
+ * updated again after the window would be merged with those later log files
and its merged commit
+ * time would fall outside the window, dropping the in-window change from
the result.
+ */
+ private def collectIncrementalFileSlices(partitionPaths: Seq[String],
latestCommit: String): Seq[FileSlice] = {
+ val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sqlContext.sparkContext))
+ val fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
+ engineContext, metaClient, fileIndex.metadataConfig,
timeline.findInstantsBeforeOrEquals(latestCommit))
+ try {
+ partitionPaths.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ .filter(fs => affectedFileGroupIds.contains(fs.getFileId))
+ }
+ } finally {
+ fsView.close()
+ }
+ }
Review Comment:
Confirmed V2-only and intentional: the commit message scopes the listing fix
to table version 8+. V1 (`MergeOnReadIncrementalRelationV1.scala:114-115` and
`:136`) still builds the window-only `HoodieTableFileSystemView(metaClient,
timeline, affectedFilesInCommits)`, so a MOR incremental read on a
table-version < 8 table retains the partial-update / event-time gap. If
table-version < 8 incremental MOR is still supported, this is worth a tracked
follow-up rather than leaving it implicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]