junaiddshaukat opened a new issue, #38977:
URL: https://github.com/apache/beam/issues/38977

   Tracking issue: #18479
   Follows: #38957 (WatermarkManager part 1 in-memory core, merged)
   
   ## Summary
   Wire the part-1 WatermarkManager into the data path so each fused stage 
computes and forwards its input watermark through it, replacing the provisional 
"flush the bundle on every received watermark" behavior in 
ExecutableStageProcessor.
   
   This is the consume side. The produce side of the agreed design (flushing 
the watermark report atomically with the offset commit, fanning it out to all 
downstream partitions, with reports crossing real topic boundaries) is 
deliberately out of scope here: the topology is still in-JVM single-instance, 
so there are no real source partitions to track or fan out across yet. That 
half lands once the topic-based shuffle (GroupByKey / redistribute-by-key) 
exists — see "Out of scope" below.
   
   ## Scope (this PR)
   - Extend the KStreamsPayload watermark variant to carry the in-band report
     fields from the agreed design: (sourcePartition, totalSourcePartitions)
     alongside the watermark millis.
   - ExecutableStageProcessor: hold a WatermarkManager; on a watermark payload,
     observe(sourcePartition, watermark, totalSourcePartitions), flush the open
     bundle, and forward the stage's output watermark downstream only when
     WatermarkManager.advance() moves it forward (monotonic min across the 
source
     partitions), stamped with this stage's own (partition, total). Removes the
     provisional flush-on-every-watermark.
   - ImpulseProcessor (a source): stamp its terminal TIMESTAMP_MAX_VALUE
     watermark with (sourcePartition=0, totalSourcePartitions=1).
   
   ## Out of scope (later parts, depend on topic-based shuffle)
   - Producing the watermark report atomically with the EOS offset commit.
   - Fanning the report out to all downstream partitions, and a Serde for it to
     cross topic boundaries.
   - Real-Kafka-cluster integration tests over the 5 scenarios (steady,
     scale-out, clean scale-in, SIGKILL, partition reassignment).
   - Watermark holds / persistence and downstream timer firing (needs state +
     timers).
   
   ## Testing
   - Processor-level test (TopologyTestDriver / MockProcessorContext) injecting
     reports from multiple source partitions: holds until every partition
     reports, forwards min(), stays monotonic, re-holds on a partition-count
     change.
   - End-to-end TopologyTestDriver test: a watermark propagates
     Impulse -> ExecutableStage -> downstream and the terminal 
TIMESTAMP_MAX_VALUE
     is observed downstream.
   - ./gradlew :runners:kafka-streams:check green.
   
   cc: @je-ik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to