[ 
https://issues.apache.org/jira/browse/FLINK-23208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379997#comment-17379997
 ] 

Piotr Nowojski edited comment on FLINK-23208 at 7/13/21, 3:54 PM:
------------------------------------------------------------------

Ok, thanks for your patient explanations [~wind_ljy] :) I think I get it now. 
I've changed the ticket from a bug to an improvement. 

Besides that, it sounds to me like we should actually hot loop all of the 
timers that could have been fired immediately, instead of scheduling execution 
and going through the {{timerService.schedule}} with a 0ms delay.

What about this proposal. In this case:
{quote}
Let's assume System.currentTimeMillis() == N. If we got backpressured or for 
whatever the reason we have a huge backlog of timers to process, for example 
one timer for every 1ms between [N - 1_000_000, N). Despite 
System.currentTimeMillis() == N, the first/next trigger call will be 
InternalTimerServiceImpl#onProcessingTime(N - 1_000_000) and we will be able to 
process only this single N - 1_000_000 timer, and we will need to sleep 1ms 
before firing N - 999_999?
{quote}
Why can not we just execute 
{{InternalTimerServiceImpl#onProcessingTime(System.currentTimeMillis())}}, and 
just hot loop in
{code:java}
        while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }
{code}
All timers that should have been fired by now in one batch? It should be many 
times as fast as going through the {{ScheduledThreadPoolExecutor}}.


was (Author: pnowojski):
Ok, thanks for your patient explanations [~wind_ljy], I think I get it now. 
I've changed the ticket from a bug to an improvement. 

Besides that, it sounds to me like we should actually hot loop all of the 
timers that could have been fired immediately, instead of scheduling execution 
and going through the {{timerService.schedule}} with a 0ms delay.

What about this proposal. In this case:
{quote}
Let's assume System.currentTimeMillis() == N. If we got backpressured or for 
whatever the reason we have a huge backlog of timers to process, for example 
one timer for every 1ms between [N - 1_000_000, N). Despite 
System.currentTimeMillis() == N, the first/next trigger call will be 
InternalTimerServiceImpl#onProcessingTime(N - 1_000_000) and we will be able to 
process only this single N - 1_000_000 timer, and we will need to sleep 1ms 
before firing N - 999_999?
{quote}
Why can not we just execute 
{{InternalTimerServiceImpl#onProcessingTime(System.currentTimeMillis())}}, and 
just hot loop in
{code:java}
        while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }
{code}
All timers that should have been fired by now in one batch? It should be many 
times as fast as going through the {{ScheduledThreadPoolExecutor}}.

> Late processing timers need to wait 1ms at least to be fired
> ------------------------------------------------------------
>
>                 Key: FLINK-23208
>                 URL: https://issues.apache.org/jira/browse/FLINK-23208
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 1.11.0, 1.11.3, 1.13.0, 1.14.0, 1.12.4
>            Reporter: Jiayi Liao
>            Priority: Critical
>              Labels: critical
>         Attachments: screenshot-1.png
>
>
> The problem is from the codes below:
> {code:java}
> public static long getProcessingTimeDelay(long processingTimestamp, long 
> currentTimestamp) {
>       // delay the firing of the timer by 1 ms to align the semantics with 
> watermark. A watermark
>       // T says we won't see elements in the future with a timestamp smaller 
> or equal to T.
>       // With processing time, we therefore need to delay firing the timer by 
> one ms.
>       return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
> }
> {code}
> Assuming a Flink job creates 1 timer per millionseconds, and is able to 
> consume 1 timer/ms. Here is what will happen: 
> * Timestmap1(1st ms): timer1 is registered and will be triggered on 
> Timestamp2. 
> * Timestamp2(2nd ms): timer2 is registered and timer1 is triggered
> * Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after 
> this, {{InternalTimerServiceImpl}} registers next timer, which is timer2, and 
> timer2 will be triggered on Timestamp4(wait 1ms at least)
> * Timestamp4(4th ms): timer4 is registered and timer2 is triggered
> * Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after 
> this, {{InternalTimerServiceImpl}} registers next timer, which is timer3, and 
> timer3 will be triggered on Timestamp6(wait 1ms at least)
> As we can see here, the ability of the Flink job is consuming 1 timer/ms, but 
> it's actually able to consume 0.5 timer/ms. And another problem is that we 
> cannot observe the delay from the lag metrics of the source(Kafka). Instead, 
> what we can tell is that the moment of output is much later than expected. 
> I've added a metrics in our inner version, we can see the lag of the timer 
> triggering keeps increasing: 
>  !screenshot-1.png! 
> *In another word, we should never let the late processing timer wait 1ms, I 
> think a simple change would be as below:*
> {code:java}
> return Math.max(processingTimestamp - currentTimestamp, -1) + 1;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to