junaiddshaukat opened a new pull request, #38987:
URL: https://github.com/apache/beam/pull/38987
## Summary
Part 2 of the WatermarkManager (part 1 = #38957). Wires the in-memory
WatermarkManager into the data path so each fused stage computes and forwards
its input watermark through it, replacing the provisional "flush on every
received watermark" behavior.
This is the in-JVM wiring. The stage still forwards its output watermark
downstream via `ctx.forward` (now gated by the WatermarkManager so it only
forwards when `min()` advances), so watermarks keep propagating and are
observed in tests. Deferred is only the durable/distributed produce side —
flushing the report atomically with the EOS offset commit, fanning it out to
all downstream partitions, and a serde for it to cross topic boundaries —
which
needs the topic-based shuffle (GroupByKey) infra that isn't built yet.
## Changes
- `KStreamsPayload`: watermark variant carries `(sourcePartition,
totalSourcePartitions)` in-band with the millis.
- `ExecutableStageProcessor`: feeds reports to a `WatermarkManager`, forwards
the output watermark only when it advances, stamped as the stage's own
single
source (0 of 1); data still processed while holding. SDK harness created
lazily on first data element.
- `ImpulseProcessor`: stamps its terminal `TIMESTAMP_MAX_VALUE` as `(0, 1)`.
## Out of scope (later, depend on topic-based shuffle)
- Producing the report atomically with the EOS offset commit.
- Fan-out to all downstream partitions + a serde for topic boundaries.
- Real-Kafka integration tests over the 5 scenarios; watermark holds /
persistence and downstream timer firing.
## Testing
- `ExecutableStageProcessorWatermarkTest` (MockProcessorContext): hold-until-
all-report, min, monotonic non-re-forward, single-source stamping.
- `WatermarkPropagationTest` (TopologyTestDriver): terminal watermark
propagates Impulse -> ExecutableStage -> recording sink.
- `./gradlew :runners:kafka-streams:check` green; 30 runner tests.
Closes #38977
Refs #18479
cc @je-ik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]