shangxinli opened a new pull request, #18802: URL: https://github.com/apache/hudi/pull/18802
### Describe the issue this Pull Request addresses Part of #17512 (Phase 2 of the reconcile plan). Builds on #18778 (Phase 1) which exposed per-partition event-time rollup on `HoodieCommitMetadata` and decoupled watermark tracking from `EVENT_TIME_ORDERING`. Phase 2 closes the remaining ask from the issue discussion: when a derived table is built from a Hudi source via `HoodieIncrSource`, propagate the upstream commit(s)' per-partition (min, max) event time into the **downstream** commit's per-partition write stats. Derived tables inherit upstream freshness without retaining the event-time column in their schema. ### Summary and Changelog The propagation is gated on a new opt-in config `hoodie.write.track.event.time.propagate.from.upstream` (default `false`, advanced, since 1.2.0). When enabled, `HoodieIncrSource` reads the per-partition rollup from each upstream commit in the read range (via the Phase 1 `HoodieCommitMetadata.getMinAndMaxEventTimePerPartition()` API), folds across commits, and exposes the result through a new `Source` hook. `StreamSync` reads the propagated map from the `InputBatch` and folds it into the downstream commit's per-partition write stats via the existing `extraPreCommitFunc` BiConsumer hook on `BaseHoodieWriteClient.commit(...)` — which was previously passed `Option.empty()` and now carries the propagation closure when there are watermarks to fold. Fold semantics are inherited from Phase 1: `HoodieWriteStat.setMinEventTime` / `setMaxEventTime` already do null-aware `Math.min` / `Math.max` folds. So propagation **only fills in fields the per-record path left unset**, and per-record extreme values are preserved when more extreme than the upstream value. Back-fills cannot regress max watermarks at the per-commit level. Upstream partitions with no matching downstream stat are dropped silently (the common projection / aggregation case). Changes: - `HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM`: new opt-in config. - `ConfigUtils.isPropagatingEventTimeFromUpstream`: accessor mirroring the existing `isTrackingEventTimeWatermark`. - `UpstreamEventTimeWatermarkExtractor`: helper that folds per-partition min/max across a set of upstream `HoodieInstant`s. - `Source.getUpstreamEventTimeWatermarks()`: new EVOLVING hook, default returns empty map. Concrete sources override when they have batch-scoped propagated watermarks. - `InputBatch`: optional `Map<String, Pair<Long, Long>> upstreamEventTimeWatermarks` field, plumbed through `RowSource`. Backward-compatible constructor overloads keep existing callers intact. - `HoodieIncrSource`: when the config is on, computes folded watermarks for both the completion-time path (Hudi 1.x source tables) and the request-time path (legacy/Hudi 0.x source tables) and exposes via the new Source hook. Best-effort — read failures are logged and skipped rather than failing the source. - `StreamSync.buildUpstreamWatermarkPreCommitFunc`: BiConsumer pre-commit hook that folds the propagated values into per-partition write stats. Returns `Option.empty()` when there is nothing to propagate (most common case), keeping the existing commit path unchanged. Tests: - `TestUpstreamEventTimeWatermarkExtractor` (8 tests): single-commit rollup, multi-commit fold, partial min/max, commit without watermark, read-failure tolerance, empty cases. - `TestStreamSyncUpstreamWatermarkPropagation` (7 tests): BiConsumer fold semantics including the never-regress invariant, unmatched partitions in both directions, multiple stats per partition. Regression: `TestHoodieCommitMetadata` (18), `TestHoodieWriteHandle` (11), `TestHoodieIncrSource` (21), `TestStreamSync` (32) all pass locally. ### Impact Public API additions: - `Source.getUpstreamEventTimeWatermarks()` (EVOLVING) — external `Source` subclasses outside the repo are unaffected (default returns empty map; no-op). - New constructor overload on `InputBatch` carrying the propagated map. All existing constructors are preserved. Behavior change for opted-in pipelines: `HoodieIncrSource` users that set both `hoodie.write.track.event.time.propagate.from.upstream=true` and read from an upstream Hudi table that itself was written with `hoodie.write.track.event.time.watermark=true` will see per-partition `minEventTime` / `maxEventTime` populated on the downstream commit's write stats, inherited from upstream. Tables that have not set the new flag see no change. No performance impact: the extractor walks at most N upstream commits per batch (already reads them) and reads each commit metadata once; the BiConsumer fold is a small in-memory loop over the downstream commit's stats. No new RDD transformations, no broadcast variables. ### Risk Level low The new config is `false` by default. The new `Source` hook returns an empty map by default; the `StreamSync` hook returns `Option.empty()` when the map is empty, which produces an `extraPreCommitFunc` that is byte-identical to the previous `Option.empty()` argument at the same call site. `InputBatch` additions are constructor overloads. Verified by running the full local hudi-common and hudi-utilities test suites for the relevant test classes with no regressions; the existing `TestHoodieIncrSource` (21 tests) and `TestStreamSync` (32 tests) both pass against the modified files. ### Documentation Update - The new `hoodie.write.track.event.time.propagate.from.upstream` config description is documented inline (its `ConfigProperty.withDocumentation(...)`) so it shows up on the Hudi configurations page on the website auto-generation pass. - A user-facing website page covering the end-to-end propagation story (raw → derived freshness) can land alongside the planned follow-up PRs that wire Spark SQL Hudi source + Flink source. ### Follow-ups (called out in the Phase 2 plan in #17512, deliberately out of scope here): - Spark SQL Hudi source propagation (Spark Datasource v2 path). - Flink source propagation. - Two-hop end-to-end test fixture (write raw table with event-time field, run streamer raw → derived with propagation on, assert derived commit inherits per-partition min/max). The propagation surface is small and well-tested at the unit level; an E2E fixture is most useful once the Spark SQL / Flink paths land so it can exercise all three engines. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Enough context is provided in the sections above - [x] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
