[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253666#comment-15253666
 ] 

Aljoscha Krettek commented on FLINK-3669:
-----------------------------------------

Hi,
it's almost done but I think we need both the {{processingTimeTimers}} set and 
the {{processingTimeTimerTimestamps}} MultiSet. The former is used to not add 
repeatedly to the queue while the latter is used for not registering a lot of 
timers at {{StreamTask}}.

As it is now in {{registerProcessingTimeTimer}}:
{code}
public void registerProcessingTimeTimer(long time) {
    Timer<K, W> timer = new Timer<>(time, key, window);
    //If this is the first timer added for this timestamp (per key and window) 
register a TriggerTask and add Timer to Queue
    if (processingTimeTimerTimestamps.add(time,1) == 0) {
        processingTimeTimersQueue.add(timer);
        ScheduledFuture<?> scheduledFuture= 
getRuntimeContext().registerTimer(time, WindowOperator.this);
        processingTimeTimerFutures.put(time, scheduledFuture);
    }
}
{code}
we correctly schedule only one timer at the {{StreamTask}} per timestamp but we 
also only schedule one timer in the queue. If we register a timer for the same 
timestamp from multiple keys we ignore the timers for all but the first key to 
register. I think it should be:
 {code}
public void registerProcessingTimeTimer(long time) {
    Timer<K, W> timer = new Timer<>(time, key, window);

    // make sure we only put one timer per key into the queue
    if (processingTimeTimers.add(timer)) {
        processingTimeTimersQueue.add(timer);

        //If this is the first timer added for this timestamp register a 
TriggerTask
        if (processingTimeTimerTimestamps.add(time, 1) == 0) {
            ScheduledFuture<?> scheduledFuture= 
getRuntimeContext().registerTimer(time, WindowOperator.this);
            processingTimeTimerFutures.put(time, scheduledFuture);
        }
    }
}
{code}

but man, this stuff is tricky to figure out... :-)

> WindowOperator registers a lot of timers at StreamTask
> ------------------------------------------------------
>
>                 Key: FLINK-3669
>                 URL: https://issues.apache.org/jira/browse/FLINK-3669
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.1
>            Reporter: Aljoscha Krettek
>            Assignee: Konstantin Knauf
>            Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to