shangxinli commented on issue #17512:
URL: https://github.com/apache/hudi/issues/17512#issuecomment-4488860325

   # Reconcile proposal — build on `hoodie.write.track.event.time.watermark`, 
don't fork it
   
   Thanks @danny0405 and @vinothchandar. After reading the existing 
implementation, I agree the right path is to expand the existing watermark 
mechanism rather than add a parallel `extraMetadata.hoodie.source.freshness` 
blob. Here's what I found and a concrete reconcile plan.
   
   ## What already exists today (1.1.0)
   
   The watermark feature already records **per-write-stat** min/max event time, 
and write stats already carry `partitionPath`. So per-partition granularity is 
*latent in the data we persist* — we just don't expose it:
   
   - **Config gate** — `HoodieWriteConfig`: 
`hoodie.write.track.event.time.watermark` (default `false`, advanced, since 
1.1.0).
   - **Per-record extraction** — `HoodieWriteHandle.getRecordMetadata()` (lines 
~353–369): reads the configured event-time field per record, stamps 
`METADATA_EVENT_TIME_KEY` onto record metadata. Activation requires 
`eventTimeFieldName != null` **AND** record merge mode `EVENT_TIME_ORDERING` 
**AND** the config flag.
   - **Per-stat aggregation** — `WriteStatus.markSuccess()` parses sec/ms epoch 
and calls `stat.setMinEventTime/setMaxEventTime`, which fold via `Math.min/max`.
   - **Persisted in commit avro** — `HoodieCommitMetadata.avsc` already 
serializes `minEventTime`/`maxEventTime` on each `HoodieWriteStat`, alongside 
`partitionPath`.
   - **Flink parity** — `HoodieRowDataCreateHandle` + `WriteStatMerger.merge` 
keep Flink writes consistent.
   - **Current exposure** — only 
`HoodieCommitMetadata.getMinAndMaxEventTime()`, which collapses across *all* 
partitions into a single pair.
   
   So the RFC's proposed `extraMetadata` payload is largely **redundant** with 
what's already on disk. The actual gaps are smaller and more targeted.
   
   ## Real gaps vs. the existing mechanism
   
   | RFC intent | Already in tree? | Gap |
   |---|---|---|
   | Per-partition min/max in commit metadata | Yes — on each `HoodieWriteStat` 
| No per-partition rollup API; consumers must walk `partitionToWriteStats` 
themselves |
   | Opt-in, additive, backward-compatible | Yes | — |
   | Raw ingest (Kafka → Hudi) freshness | Yes (when `EVENT_TIME_ORDERING` + 
event-time field set) | Requirement to be in `EVENT_TIME_ORDERING` merge mode 
is unnecessary for *pure freshness* tracking |
   | **Derived/transformed tables propagate upstream freshness** | **No** | 
This is genuinely new |
   | External tooling access | Possible via commit metadata APIs | Needs a 
stable, documented accessor |
   
   ## Proposed reconcile (concrete deltas)
   
   **1. Expose per-partition rollup on `HoodieCommitMetadata`.** Add alongside 
the existing global getter:
   ```java
   public Map<String, Pair<Option<Long>, Option<Long>>> 
getMinAndMaxEventTimePerPartition();
   ```
   Pure aggregation over `partitionToWriteStats` — no new persisted bytes, no 
schema change. Keep `getMinAndMaxEventTime()` as the collapsed view.
   
   **2. Decouple watermark tracking from `EVENT_TIME_ORDERING`.** In 
`HoodieWriteHandle`:
   ```java
   this.isTrackingEventTimeWatermark =
       this.eventTimeFieldName != null
       && ConfigUtils.isTrackingEventTimeWatermark(config.getProps());
   // drop: && recordMergeMode == EVENT_TIME_ORDERING
   ```
   Freshness observability is orthogonal to merge semantics. The current 
coupling means COW/COMMIT_TIME_ORDERING tables silently get no watermark even 
when the user opts in — that's a usability bug.
   
   **3. Add upstream propagation as a second source (the only genuinely new 
piece).** When `eventTimeFieldName` is *not* set but the writer is reading from 
another Hudi table (e.g., HoodieIncrSource, Spark SQL on a Hudi source), 
inherit the upstream commit's per-partition min/max as the downstream commit's 
watermark for the same destination partitions. Order of precedence:
      1. Per-record event-time field (existing path)
      2. Upstream Hudi commit propagation (new)
      3. None (do not infer)
   
      New config: `hoodie.write.track.event.time.propagate.from.upstream=true` 
(default off, advanced). Wires into the source readers, not `HoodieWriteHandle`.
   
   **4. Naming.** Keep `hoodie.write.track.event.time.watermark` as the 
canonical key. If you want a more user-facing name 
(`hoodie.write.track.freshness.*`), alias it and deprecate — don't fork.
   
   **5. Drop the proposed `extraMetadata.hoodie.source.freshness` blob 
entirely.** It duplicates 
`writeStats[].{partitionPath,minEventTime,maxEventTime}`. The only reason to 
use `extraMetadata` would be for *upstream-propagated* values that don't 
correspond to any record this writer scanned; even there, writing them into the 
same `HoodieWriteStat` fields keeps a single canonical location.
   
   ## What this changes vs. the original RFC
   
   - No new commit-metadata key, no new avro field — purely additive APIs and a 
propagation path.
   - Same end-user capability ("how fresh is partition `dt=2025-12-01`?") via 
`commitMetadata.getMinAndMaxEventTimePerPartition().get("dt=2025-12-01")`.
   - Derived-table propagation becomes its own focused subtask rather than the 
centerpiece.
   
   Happy to split this into (a) the rollup API + decoupling PR, and (b) a 
separate RFC for upstream propagation if you prefer that shape. WDYT 
@vinothchandar @danny0405?
   
   ---
   
   ## Notes before posting
   
   - The assertion that the merge-mode coupling is a usability bug is worth a 
quick sanity check — confirm there isn't an intentional reason watermark 
tracking is gated on `EVENT_TIME_ORDERING`.
   - The propagation idea (step 3) is the substantive new piece and the 
riskiest — it touches source readers, not just write handles. If the 
maintainers want to scope this down, dropping (3) and shipping just (1)+(2) 
still solves the per-partition exposure problem they were skeptical about.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to