[
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253666#comment-15253666
]
Aljoscha Krettek commented on FLINK-3669:
-----------------------------------------
Hi,
it's almost done but I think we need both the {{processingTimeTimers}} set and
the {{processingTimeTimerTimestamps}} MultiSet. The former is used to not add
repeatedly to the queue while the latter is used for not registering a lot of
timers at {{StreamTask}}.
As it is now in {{registerProcessingTimeTimer}}:
{code}
public void registerProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
//If this is the first timer added for this timestamp (per key and window)
register a TriggerTask and add Timer to Queue
if (processingTimeTimerTimestamps.add(time,1) == 0) {
processingTimeTimersQueue.add(timer);
ScheduledFuture<?> scheduledFuture=
getRuntimeContext().registerTimer(time, WindowOperator.this);
processingTimeTimerFutures.put(time, scheduledFuture);
}
}
{code}
we correctly schedule only one timer at the {{StreamTask}} per timestamp but we
also only schedule one timer in the queue. If we register a timer for the same
timestamp from multiple keys we ignore the timers for all but the first key to
register. I think it should be:
{code}
public void registerProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
// make sure we only put one timer per key into the queue
if (processingTimeTimers.add(timer)) {
processingTimeTimersQueue.add(timer);
//If this is the first timer added for this timestamp register a
TriggerTask
if (processingTimeTimerTimestamps.add(time, 1) == 0) {
ScheduledFuture<?> scheduledFuture=
getRuntimeContext().registerTimer(time, WindowOperator.this);
processingTimeTimerFutures.put(time, scheduledFuture);
}
}
}
{code}
but man, this stuff is tricky to figure out... :-)
> WindowOperator registers a lot of timers at StreamTask
> ------------------------------------------------------
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.0.1
> Reporter: Aljoscha Krettek
> Assignee: Konstantin Knauf
> Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every
> processing-time timer that a Trigger registers. We should combine several
> registered trigger timers to only register one low-level timer (timer
> coalescing).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)