[ 
https://issues.apache.org/jira/browse/FLINK-23208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379901#comment-17379901
 ] 

Jiayi Liao edited comment on FLINK-23208 at 7/13/21, 1:58 PM:
--------------------------------------------------------------

[~pnowojski]

>    Why registering and firing timers is connected?

Here is the problem. User can register the timers at any time he/she wants, but 
inside {{InternalTimerServiceImpl.registerProcessingTimeTimer}}, this operation 
only adds a new timer into {{processingTimeTimersQueue}}. 

And timers in {{processingTimeTimersQueue}} are triggered with growing 
timestamp one by one, and *timers at timestamp N+1 have to wait 1ms at least 
after timers at timestamp N are fired. (according to 
{{InternalTimerServiceImpl.onProcessingTime(long)}})*

 

 


was (Author: wind_ljy):
[~pnowojski]

>    Why registering and firing timers is connected?

Here is the problem. User can register the timers at any time he/she wants, but 
inside {{InternalTimerServiceImpl.registerProcessingTimeTimer}}, this operation 
only adds a new timer into {{processingTimeTimersQueue}}. 

And timers in {{processingTimeTimersQueue}} are triggered with growing 
timestamp one by one, and *timers at timestamp N+1 have to wait 1ms at least 
after timers at timestamp N are fired.*

 

 

> Late processing timers need to wait 1ms at least to be fired
> ------------------------------------------------------------
>
>                 Key: FLINK-23208
>                 URL: https://issues.apache.org/jira/browse/FLINK-23208
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 1.11.0, 1.11.3, 1.13.0, 1.14.0, 1.12.4
>            Reporter: Jiayi Liao
>            Priority: Critical
>              Labels: critical
>         Attachments: screenshot-1.png
>
>
> The problem is from the codes below:
> {code:java}
> public static long getProcessingTimeDelay(long processingTimestamp, long 
> currentTimestamp) {
>       // delay the firing of the timer by 1 ms to align the semantics with 
> watermark. A watermark
>       // T says we won't see elements in the future with a timestamp smaller 
> or equal to T.
>       // With processing time, we therefore need to delay firing the timer by 
> one ms.
>       return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
> }
> {code}
> Assuming a Flink job creates 1 timer per millionseconds, and is able to 
> consume 1 timer/ms. Here is what will happen: 
> * Timestmap1(1st ms): timer1 is registered and will be triggered on 
> Timestamp2. 
> * Timestamp2(2nd ms): timer2 is registered and timer1 is triggered
> * Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after 
> this, {{InternalTimerServiceImpl}} registers next timer, which is timer2, and 
> timer2 will be triggered on Timestamp4(wait 1ms at least)
> * Timestamp4(4th ms): timer4 is registered and timer2 is triggered
> * Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after 
> this, {{InternalTimerServiceImpl}} registers next timer, which is timer3, and 
> timer3 will be triggered on Timestamp6(wait 1ms at least)
> As we can see here, the ability of the Flink job is consuming 1 timer/ms, but 
> it's actually able to consume 0.5 timer/ms. And another problem is that we 
> cannot observe the delay from the lag metrics of the source(Kafka). Instead, 
> what we can tell is that the moment of output is much later than expected. 
> I've added a metrics in our inner version, we can see the lag of the timer 
> triggering keeps increasing: 
>  !screenshot-1.png! 
> *In another word, we should never let the late processing timer wait 1ms, I 
> think a simple change would be as below:*
> {code:java}
> return Math.max(processingTimestamp - currentTimestamp, -1) + 1;
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to