junaiddshaukat opened a new issue, #38977:
URL: https://github.com/apache/beam/issues/38977
Tracking issue: #18479
Follows: #38957 (WatermarkManager part 1 in-memory core, merged)
## Summary
Wire the part-1 WatermarkManager into the data path so each fused stage
computes and forwards its input watermark through it, replacing the provisional
"flush the bundle on every received watermark" behavior in
ExecutableStageProcessor.
This is the consume side. The produce side of the agreed design (flushing
the watermark report atomically with the offset commit, fanning it out to all
downstream partitions, with reports crossing real topic boundaries) is
deliberately out of scope here: the topology is still in-JVM single-instance,
so there are no real source partitions to track or fan out across yet. That
half lands once the topic-based shuffle (GroupByKey / redistribute-by-key)
exists — see "Out of scope" below.
## Scope (this PR)
- Extend the KStreamsPayload watermark variant to carry the in-band report
fields from the agreed design: (sourcePartition, totalSourcePartitions)
alongside the watermark millis.
- ExecutableStageProcessor: hold a WatermarkManager; on a watermark payload,
observe(sourcePartition, watermark, totalSourcePartitions), flush the open
bundle, and forward the stage's output watermark downstream only when
WatermarkManager.advance() moves it forward (monotonic min across the
source
partitions), stamped with this stage's own (partition, total). Removes the
provisional flush-on-every-watermark.
- ImpulseProcessor (a source): stamp its terminal TIMESTAMP_MAX_VALUE
watermark with (sourcePartition=0, totalSourcePartitions=1).
## Out of scope (later parts, depend on topic-based shuffle)
- Producing the watermark report atomically with the EOS offset commit.
- Fanning the report out to all downstream partitions, and a Serde for it to
cross topic boundaries.
- Real-Kafka-cluster integration tests over the 5 scenarios (steady,
scale-out, clean scale-in, SIGKILL, partition reassignment).
- Watermark holds / persistence and downstream timer firing (needs state +
timers).
## Testing
- Processor-level test (TopologyTestDriver / MockProcessorContext) injecting
reports from multiple source partitions: holds until every partition
reports, forwards min(), stays monotonic, re-holds on a partition-count
change.
- End-to-end TopologyTestDriver test: a watermark propagates
Impulse -> ExecutableStage -> downstream and the terminal
TIMESTAMP_MAX_VALUE
is observed downstream.
- ./gradlew :runners:kafka-streams:check green.
cc: @je-ik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]