junaiddshaukat commented on code in PR #38987:
URL: https://github.com/apache/beam/pull/38987#discussion_r3433640655


##########
runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java:
##########
@@ -101,6 +118,29 @@ public long getWatermarkMillis() {
     return watermarkMillis;
   }
 
+  /**
+   * Returns the source partition this watermark report is for. Caller must 
check {@link
+   * #isWatermark()} first; calling this on a data payload throws.
+   */
+  public int getSourcePartition() {
+    if (kind != Kind.WATERMARK) {

Review Comment:
   Added Preconditions.checkArgument in the watermark() factory to validate 
sourcePartition / totalSourcePartitions.
   
   Done — pulled the watermark accessors onto a WatermarkPayload interface; you 
reach them via payload.asWatermark() (which checkStates isWatermark()), and I 
updated the callers.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to