lukecwik commented on code in PR #22889:
URL: https://github.com/apache/beam/pull/22889#discussion_r1036494997
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java:
##########
@@ -85,6 +86,8 @@ public static <InputT, OutputT> BufferingDoFnRunner<InputT,
OutputT> create(
int currentStateIndex;
/** The current handler used for buffering. */
private BufferingElementsHandler currentBufferingElementsHandler;
+ /** Minimum timestamp of all buffered elements. */
+ private volatile long currentOutputWatermarkHold;
Review Comment:
Your right, my confusion was about the purpose here. It would make more
sense to call this `minOutputWatermarkHoldSeen`
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java:
##########
@@ -164,6 +171,8 @@ public <KeyT> void onTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
+
+ currentOutputWatermarkHold = Math.min(timestamp.getMillis(),
currentOutputWatermarkHold);
Review Comment:
Shouldn't this be `outputTimestamp` since `outputTimestamp` is the minimum
timestamp that is used for producing output for a timer?
```suggestion
currentOutputWatermarkHold = Math.min(outputTimestamp.getMillis(),
currentOutputWatermarkHold);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]