[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253666#comment-15253666 ]
Aljoscha Krettek commented on FLINK-3669: ----------------------------------------- Hi, it's almost done but I think we need both the {{processingTimeTimers}} set and the {{processingTimeTimerTimestamps}} MultiSet. The former is used to not add repeatedly to the queue while the latter is used for not registering a lot of timers at {{StreamTask}}. As it is now in {{registerProcessingTimeTimer}}: {code} public void registerProcessingTimeTimer(long time) { Timer<K, W> timer = new Timer<>(time, key, window); //If this is the first timer added for this timestamp (per key and window) register a TriggerTask and add Timer to Queue if (processingTimeTimerTimestamps.add(time,1) == 0) { processingTimeTimersQueue.add(timer); ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this); processingTimeTimerFutures.put(time, scheduledFuture); } } {code} we correctly schedule only one timer at the {{StreamTask}} per timestamp but we also only schedule one timer in the queue. If we register a timer for the same timestamp from multiple keys we ignore the timers for all but the first key to register. I think it should be: {code} public void registerProcessingTimeTimer(long time) { Timer<K, W> timer = new Timer<>(time, key, window); // make sure we only put one timer per key into the queue if (processingTimeTimers.add(timer)) { processingTimeTimersQueue.add(timer); //If this is the first timer added for this timestamp register a TriggerTask if (processingTimeTimerTimestamps.add(time, 1) == 0) { ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this); processingTimeTimerFutures.put(time, scheduledFuture); } } } {code} but man, this stuff is tricky to figure out... :-) > WindowOperator registers a lot of timers at StreamTask > ------------------------------------------------------ > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 1.0.1 > Reporter: Aljoscha Krettek > Assignee: Konstantin Knauf > Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)