je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469774735



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), 
newTimer.getNamespace()));

Review comment:
       It should not. But in some cases, a timer output timestamp seems to 
appear in the queue _twice_ (for the same timer) and removed only _once_, which 
causes the pipeline to get stuck.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to