boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513066466
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
}
}
+ /** Holds the watermark when there is an sdf timer. */
+ private void onNewSdfTimer(TimerData newTimer) {
+ Preconditions.checkState(
+
StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+ Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+ keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+ }
Review comment:
Sorry I want to revisit the idea of having `onFiredTimer` here. I think
it's a good idea to have `onFiredTimer` for firing timers. But the function
`onNewSdfTimer` and `onNewEventTimer` is about to set watermark hold when
registering timers. Different from event timer, an SDF timer must have the
output timestamp for controlling watermark hold. It's important for SDF
execution. That's why we have a check instead of an if block here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]