[
https://issues.apache.org/jira/browse/FLINK-38932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38932:
-----------------------------------
Labels: pull-request-available (was: )
> Incorrect scheduled timestamp in ProcessingTimeCallback with
> scheduleWithFixedDelay
> -----------------------------------------------------------------------------------
>
> Key: FLINK-38932
> URL: https://issues.apache.org/jira/browse/FLINK-38932
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Reporter: Zhanghao Chen
> Priority: Minor
> Labels: pull-request-available
>
> h2. {{Problem}}
> {{{}ProcessingTimeCallback#{}}}{{{}onProcessingTime{}}}tasks a parameter
> {{time}} which is the scheduled time of the callback. We found that the
> received time keeps lagging behind the actual scheduled time when using
> {{scheduleWithFixedDelay}} to register the timer.
> h2. Root Cause Analysis
> The time is computed by {{{}SystemProcessingTimeService#{}}}{{ScheduledTask}}
> by cumulating a fixed period evety time when the scheduled task runs. This is
> correct for {{{}scheduleWithFixedPeriod{}}}, but incorrect for
> {{scheduleWithFixedDelay}} as the processing/queuing delay is not accounted
> here.
> {code:java}
> public void run() {
> if (serviceStatus.get() != STATUS_ALIVE) {
> return;
> }
> try {
> callback.onProcessingTime(nextTimestamp);
> } catch (Exception ex) {
> exceptionHandler.handleException(ex);
> }
> nextTimestamp += period;
> } {code}
> h2. Fix
> {code:java}
> public void run() {
> if (serviceStatus.get() != STATUS_ALIVE) {
> return;
> }
> try {
> callback.onProcessingTime(nextTimestamp);
> } catch (Exception ex) {
> exceptionHandler.handleException(ex);
> }
> if (fixedDelay) {
> nextTimestamp = System.currentTimeMillis() + period;
> } else {
> nextTimestamp += period;
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)