mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469777963



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), 
newTimer.getNamespace()));

Review comment:
       Duplicates are ok. If we have multiple timers set with different ids or 
even for the same keys, duplicates are expected. We could use a TreeMap instead 
of a PriorityQueue like we do in FlinkTimerInterals for the WatermarkHoldState, 
but I don't see anything wrong with the implementation here. 
   
   You likely have a problem in your application code, i.e. you set a timer 
output timestamp which holds back the watermark but your fire timestamp has not 
been reached by the watermark yet. Please investigate that first. Also note 
#12531 which was a bug in the Python SDK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to