[ 
https://issues.apache.org/jira/browse/FLINK-38932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38932:
-----------------------------------
    Labels: pull-request-available  (was: )

> Incorrect scheduled timestamp in ProcessingTimeCallback with 
> scheduleWithFixedDelay
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38932
>                 URL: https://issues.apache.org/jira/browse/FLINK-38932
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>            Reporter: Zhanghao Chen
>            Priority: Minor
>              Labels: pull-request-available
>
> h2. {{Problem}}
> {{{}ProcessingTimeCallback#{}}}{{{}onProcessingTime{}}}tasks a parameter 
> {{time}} which is the scheduled time of the callback. We found that the 
> received time keeps lagging behind the actual scheduled time when using 
> {{scheduleWithFixedDelay}} to register the timer.
> h2. Root Cause Analysis
> The time is computed by {{{}SystemProcessingTimeService#{}}}{{ScheduledTask}} 
> by cumulating a fixed period evety time when the scheduled task runs. This is 
> correct for {{{}scheduleWithFixedPeriod{}}}, but incorrect for 
> {{scheduleWithFixedDelay}} as the processing/queuing delay is not accounted 
> here.
> {code:java}
> public void run() {
>     if (serviceStatus.get() != STATUS_ALIVE) {
>         return;
>     }
>     try {
>         callback.onProcessingTime(nextTimestamp);
>     } catch (Exception ex) {
>         exceptionHandler.handleException(ex);
>     }
>     nextTimestamp += period;
> } {code}
> h2. Fix
> {code:java}
> public void run() {
>     if (serviceStatus.get() != STATUS_ALIVE) {
>         return;
>     }
>     try {
>         callback.onProcessingTime(nextTimestamp);
>     } catch (Exception ex) {
>         exceptionHandler.handleException(ex);
>     }
>     if (fixedDelay) {
>         nextTimestamp = System.currentTimeMillis() + period;
>     } else {
>         nextTimestamp += period;
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to