shangxinli opened a new pull request, #18802:
URL: https://github.com/apache/hudi/pull/18802

   ### Describe the issue this Pull Request addresses
   
   Part of #17512 (Phase 2 of the reconcile plan). Builds on #18778 (Phase 1) 
which exposed per-partition event-time rollup on `HoodieCommitMetadata` and 
decoupled watermark tracking from `EVENT_TIME_ORDERING`.
   
   Phase 2 closes the remaining ask from the issue discussion: when a derived 
table is built from a Hudi source via `HoodieIncrSource`, propagate the 
upstream commit(s)' per-partition (min, max) event time into the **downstream** 
commit's per-partition write stats. Derived tables inherit upstream freshness 
without retaining the event-time column in their schema.
   
   ### Summary and Changelog
   
   The propagation is gated on a new opt-in config 
`hoodie.write.track.event.time.propagate.from.upstream` (default `false`, 
advanced, since 1.2.0). When enabled, `HoodieIncrSource` reads the 
per-partition rollup from each upstream commit in the read range (via the Phase 
1 `HoodieCommitMetadata.getMinAndMaxEventTimePerPartition()` API), folds across 
commits, and exposes the result through a new `Source` hook. `StreamSync` reads 
the propagated map from the `InputBatch` and folds it into the downstream 
commit's per-partition write stats via the existing `extraPreCommitFunc` 
BiConsumer hook on `BaseHoodieWriteClient.commit(...)` — which was previously 
passed `Option.empty()` and now carries the propagation closure when there are 
watermarks to fold.
   
   Fold semantics are inherited from Phase 1: `HoodieWriteStat.setMinEventTime` 
/ `setMaxEventTime` already do null-aware `Math.min` / `Math.max` folds. So 
propagation **only fills in fields the per-record path left unset**, and 
per-record extreme values are preserved when more extreme than the upstream 
value. Back-fills cannot regress max watermarks at the per-commit level. 
Upstream partitions with no matching downstream stat are dropped silently (the 
common projection / aggregation case).
   
   Changes:
   - `HoodieWriteConfig.TRACK_EVENT_TIME_PROPAGATE_FROM_UPSTREAM`: new opt-in 
config.
   - `ConfigUtils.isPropagatingEventTimeFromUpstream`: accessor mirroring the 
existing `isTrackingEventTimeWatermark`.
   - `UpstreamEventTimeWatermarkExtractor`: helper that folds per-partition 
min/max across a set of upstream `HoodieInstant`s.
   - `Source.getUpstreamEventTimeWatermarks()`: new EVOLVING hook, default 
returns empty map. Concrete sources override when they have batch-scoped 
propagated watermarks.
   - `InputBatch`: optional `Map<String, Pair<Long, Long>> 
upstreamEventTimeWatermarks` field, plumbed through `RowSource`. 
Backward-compatible constructor overloads keep existing callers intact.
   - `HoodieIncrSource`: when the config is on, computes folded watermarks for 
both the completion-time path (Hudi 1.x source tables) and the request-time 
path (legacy/Hudi 0.x source tables) and exposes via the new Source hook. 
Best-effort — read failures are logged and skipped rather than failing the 
source.
   - `StreamSync.buildUpstreamWatermarkPreCommitFunc`: BiConsumer pre-commit 
hook that folds the propagated values into per-partition write stats. Returns 
`Option.empty()` when there is nothing to propagate (most common case), keeping 
the existing commit path unchanged.
   
   Tests:
   - `TestUpstreamEventTimeWatermarkExtractor` (8 tests): single-commit rollup, 
multi-commit fold, partial min/max, commit without watermark, read-failure 
tolerance, empty cases.
   - `TestStreamSyncUpstreamWatermarkPropagation` (7 tests): BiConsumer fold 
semantics including the never-regress invariant, unmatched partitions in both 
directions, multiple stats per partition.
   
   Regression: `TestHoodieCommitMetadata` (18), `TestHoodieWriteHandle` (11), 
`TestHoodieIncrSource` (21), `TestStreamSync` (32) all pass locally.
   
   ### Impact
   
   Public API additions:
   - `Source.getUpstreamEventTimeWatermarks()` (EVOLVING) — external `Source` 
subclasses outside the repo are unaffected (default returns empty map; no-op).
   - New constructor overload on `InputBatch` carrying the propagated map. All 
existing constructors are preserved.
   
   Behavior change for opted-in pipelines: `HoodieIncrSource` users that set 
both `hoodie.write.track.event.time.propagate.from.upstream=true` and read from 
an upstream Hudi table that itself was written with 
`hoodie.write.track.event.time.watermark=true` will see per-partition 
`minEventTime` / `maxEventTime` populated on the downstream commit's write 
stats, inherited from upstream. Tables that have not set the new flag see no 
change.
   
   No performance impact: the extractor walks at most N upstream commits per 
batch (already reads them) and reads each commit metadata once; the BiConsumer 
fold is a small in-memory loop over the downstream commit's stats. No new RDD 
transformations, no broadcast variables.
   
   ### Risk Level
   
   low
   
   The new config is `false` by default. The new `Source` hook returns an empty 
map by default; the `StreamSync` hook returns `Option.empty()` when the map is 
empty, which produces an `extraPreCommitFunc` that is byte-identical to the 
previous `Option.empty()` argument at the same call site. `InputBatch` 
additions are constructor overloads. Verified by running the full local 
hudi-common and hudi-utilities test suites for the relevant test classes with 
no regressions; the existing `TestHoodieIncrSource` (21 tests) and 
`TestStreamSync` (32 tests) both pass against the modified files.
   
   ### Documentation Update
   
   - The new `hoodie.write.track.event.time.propagate.from.upstream` config 
description is documented inline (its `ConfigProperty.withDocumentation(...)`) 
so it shows up on the Hudi configurations page on the website auto-generation 
pass.
   - A user-facing website page covering the end-to-end propagation story (raw 
→ derived freshness) can land alongside the planned follow-up PRs that wire 
Spark SQL Hudi source + Flink source.
   
   ### Follow-ups (called out in the Phase 2 plan in #17512, deliberately out 
of scope here):
   
   - Spark SQL Hudi source propagation (Spark Datasource v2 path).
   - Flink source propagation.
   - Two-hop end-to-end test fixture (write raw table with event-time field, 
run streamer raw → derived with propagation on, assert derived commit inherits 
per-partition min/max). The propagation surface is small and well-tested at the 
unit level; an E2E fixture is most useful once the Spark SQL / Flink paths land 
so it can exercise all three engines.
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to