This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d30cf4d9ede [FLINK-38932][runtime] Fix incorrect scheduled timestamp 
in ProcessingTimeCallback with scheduleWithFixedDelay (#27429)
d30cf4d9ede is described below

commit d30cf4d9ede52b39df5dff39c79b9517228d18f1
Author: Zhanghao Chen <[email protected]>
AuthorDate: Thu Jan 29 09:53:28 2026 +0800

    [FLINK-38932][runtime] Fix incorrect scheduled timestamp in 
ProcessingTimeCallback with scheduleWithFixedDelay (#27429)
---
 .../runtime/tasks/SystemProcessingTimeService.java | 17 +++--
 .../tasks/SystemProcessingTimeServiceTest.java     | 80 +++++++++++++++++++++-
 2 files changed, 89 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 73722249340..4d9490b0e0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -141,7 +141,7 @@ public class SystemProcessingTimeService implements 
TimerService {
     private ScheduledFuture<?> scheduleRepeatedly(
             ProcessingTimeCallback callback, long initialDelay, long period, 
boolean fixedDelay) {
         final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
-        final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, 
period);
+        final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, 
period, fixedDelay);
 
         // we directly try to register the timer and only react to the status 
on exception
         // that way we save unnecessary volatile accesses for each timer
@@ -281,12 +281,13 @@ public class SystemProcessingTimeService implements 
TimerService {
     }
 
     private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long 
timestamp) {
-        return new ScheduledTask(status, exceptionHandler, callback, 
timestamp, 0);
+        return new ScheduledTask(status, exceptionHandler, callback, 
timestamp, 0, false);
     }
 
     private Runnable wrapOnTimerCallback(
-            ProcessingTimeCallback callback, long nextTimestamp, long period) {
-        return new ScheduledTask(status, exceptionHandler, callback, 
nextTimestamp, period);
+            ProcessingTimeCallback callback, long nextTimestamp, long period, 
boolean fixedDelay) {
+        return new ScheduledTask(
+                status, exceptionHandler, callback, nextTimestamp, period, 
fixedDelay);
     }
 
     private static final class ScheduledTask implements Runnable {
@@ -296,18 +297,21 @@ public class SystemProcessingTimeService implements 
TimerService {
 
         private long nextTimestamp;
         private final long period;
+        private final boolean fixedDelay;
 
         ScheduledTask(
                 AtomicInteger serviceStatus,
                 ExceptionHandler exceptionHandler,
                 ProcessingTimeCallback callback,
                 long timestamp,
-                long period) {
+                long period,
+                boolean fixedDelay) {
             this.serviceStatus = serviceStatus;
             this.exceptionHandler = exceptionHandler;
             this.callback = callback;
             this.nextTimestamp = timestamp;
             this.period = period;
+            this.fixedDelay = fixedDelay;
         }
 
         @Override
@@ -320,7 +324,8 @@ public class SystemProcessingTimeService implements 
TimerService {
             } catch (Exception ex) {
                 exceptionHandler.handleException(ex);
             }
-            nextTimestamp += period;
+            nextTimestamp =
+                    fixedDelay ? System.currentTimeMillis() + period : 
nextTimestamp + period;
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 097b6863dba..f11ffa65bc9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.Preconditions;
 
+import org.assertj.core.data.Offset;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -43,13 +44,15 @@ class SystemProcessingTimeServiceTest {
 
     /**
      * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually 
triggered multiple
-     * times.
+     * times with the expected scheduled timestamps.
      */
     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
     @Test
     void testScheduleAtFixedRate() throws Exception {
         final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        final long initialDelay = 10L;
         final long period = 10L;
+        final long executionDelay = 10L;
         final int countDown = 3;
 
         final SystemProcessingTimeService timer = 
createSystemProcessingTimeService(errorRef);
@@ -57,7 +60,76 @@ class SystemProcessingTimeServiceTest {
         final CountDownLatch countDownLatch = new CountDownLatch(countDown);
 
         try {
-            timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(), 
0L, period);
+            final long initialTimestamp = timer.getCurrentProcessingTime() + 
initialDelay;
+            timer.scheduleAtFixedRate(
+                    timestamp -> {
+                        try {
+                            long executionTimes = countDown - 
countDownLatch.getCount();
+                            assertThat(timestamp)
+                                    .isCloseTo(
+                                            initialTimestamp + executionTimes 
* period,
+                                            Offset.offset(period));
+                            Thread.sleep(executionDelay);
+                        } catch (Error e) {
+                            System.out.println(e.getMessage());
+                            throw new Error(e);
+                        }
+                        countDownLatch.countDown();
+                    },
+                    initialDelay,
+                    period);
+
+            countDownLatch.await();
+
+            assertThat(errorRef.get()).isNull();
+        } finally {
+            timer.shutdownService();
+        }
+    }
+
+    /**
+     * Tests that SystemProcessingTimeService#testScheduleAtFixedDelay is 
actually triggered
+     * multiple times with the expected scheduled timestamps.
+     */
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    @Test
+    void testScheduleAtFixedDelay() throws Exception {
+        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        final long initialDelay = 10L;
+        final long period = 10L;
+        final long executionDelay = 10L;
+        final int countDown = 3;
+
+        final SystemProcessingTimeService timer = 
createSystemProcessingTimeService(errorRef);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(countDown);
+
+        final LastExecutionTimeWrapper lastExecutionTimeWrapper = new 
LastExecutionTimeWrapper();
+
+        try {
+            final long initialTimestamp = timer.getCurrentProcessingTime() + 
initialDelay;
+            timer.scheduleWithFixedDelay(
+                    timestamp -> {
+                        try {
+                            if (countDownLatch.getCount() == countDown) {
+                                assertThat(timestamp)
+                                        .isCloseTo(initialTimestamp, 
Offset.offset(period));
+                            } else {
+                                assertThat(timestamp)
+                                        .isCloseTo(
+                                                lastExecutionTimeWrapper.ts + 
period,
+                                                Offset.offset(period));
+                            }
+                            Thread.sleep(executionDelay);
+                            lastExecutionTimeWrapper.ts = 
timer.getCurrentProcessingTime();
+                        } catch (Error e) {
+                            System.out.println(e.getMessage());
+                            throw new Error(e);
+                        }
+                        countDownLatch.countDown();
+                    },
+                    initialDelay,
+                    period);
 
             countDownLatch.await();
 
@@ -67,6 +139,10 @@ class SystemProcessingTimeServiceTest {
         }
     }
 
+    private static class LastExecutionTimeWrapper {
+        private long ts;
+    }
+
     /**
      * Tests that shutting down the SystemProcessingTimeService will also 
cancel the scheduled at
      * fix rate future.

Reply via email to