This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d30cf4d9ede [FLINK-38932][runtime] Fix incorrect scheduled timestamp
in ProcessingTimeCallback with scheduleWithFixedDelay (#27429)
d30cf4d9ede is described below
commit d30cf4d9ede52b39df5dff39c79b9517228d18f1
Author: Zhanghao Chen <[email protected]>
AuthorDate: Thu Jan 29 09:53:28 2026 +0800
[FLINK-38932][runtime] Fix incorrect scheduled timestamp in
ProcessingTimeCallback with scheduleWithFixedDelay (#27429)
---
.../runtime/tasks/SystemProcessingTimeService.java | 17 +++--
.../tasks/SystemProcessingTimeServiceTest.java | 80 +++++++++++++++++++++-
2 files changed, 89 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 73722249340..4d9490b0e0d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -141,7 +141,7 @@ public class SystemProcessingTimeService implements
TimerService {
private ScheduledFuture<?> scheduleRepeatedly(
ProcessingTimeCallback callback, long initialDelay, long period,
boolean fixedDelay) {
final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
- final Runnable task = wrapOnTimerCallback(callback, nextTimestamp,
period);
+ final Runnable task = wrapOnTimerCallback(callback, nextTimestamp,
period, fixedDelay);
// we directly try to register the timer and only react to the status
on exception
// that way we save unnecessary volatile accesses for each timer
@@ -281,12 +281,13 @@ public class SystemProcessingTimeService implements
TimerService {
}
private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long
timestamp) {
- return new ScheduledTask(status, exceptionHandler, callback,
timestamp, 0);
+ return new ScheduledTask(status, exceptionHandler, callback,
timestamp, 0, false);
}
private Runnable wrapOnTimerCallback(
- ProcessingTimeCallback callback, long nextTimestamp, long period) {
- return new ScheduledTask(status, exceptionHandler, callback,
nextTimestamp, period);
+ ProcessingTimeCallback callback, long nextTimestamp, long period,
boolean fixedDelay) {
+ return new ScheduledTask(
+ status, exceptionHandler, callback, nextTimestamp, period,
fixedDelay);
}
private static final class ScheduledTask implements Runnable {
@@ -296,18 +297,21 @@ public class SystemProcessingTimeService implements
TimerService {
private long nextTimestamp;
private final long period;
+ private final boolean fixedDelay;
ScheduledTask(
AtomicInteger serviceStatus,
ExceptionHandler exceptionHandler,
ProcessingTimeCallback callback,
long timestamp,
- long period) {
+ long period,
+ boolean fixedDelay) {
this.serviceStatus = serviceStatus;
this.exceptionHandler = exceptionHandler;
this.callback = callback;
this.nextTimestamp = timestamp;
this.period = period;
+ this.fixedDelay = fixedDelay;
}
@Override
@@ -320,7 +324,8 @@ public class SystemProcessingTimeService implements
TimerService {
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
- nextTimestamp += period;
+ nextTimestamp =
+ fixedDelay ? System.currentTimeMillis() + period :
nextTimestamp + period;
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 097b6863dba..f11ffa65bc9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;
+import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -43,13 +44,15 @@ class SystemProcessingTimeServiceTest {
/**
* Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually
triggered multiple
- * times.
+ * times with the expected scheduled timestamps.
*/
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
@Test
void testScheduleAtFixedRate() throws Exception {
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final long initialDelay = 10L;
final long period = 10L;
+ final long executionDelay = 10L;
final int countDown = 3;
final SystemProcessingTimeService timer =
createSystemProcessingTimeService(errorRef);
@@ -57,7 +60,76 @@ class SystemProcessingTimeServiceTest {
final CountDownLatch countDownLatch = new CountDownLatch(countDown);
try {
- timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(),
0L, period);
+ final long initialTimestamp = timer.getCurrentProcessingTime() +
initialDelay;
+ timer.scheduleAtFixedRate(
+ timestamp -> {
+ try {
+ long executionTimes = countDown -
countDownLatch.getCount();
+ assertThat(timestamp)
+ .isCloseTo(
+ initialTimestamp + executionTimes
* period,
+ Offset.offset(period));
+ Thread.sleep(executionDelay);
+ } catch (Error e) {
+ System.out.println(e.getMessage());
+ throw new Error(e);
+ }
+ countDownLatch.countDown();
+ },
+ initialDelay,
+ period);
+
+ countDownLatch.await();
+
+ assertThat(errorRef.get()).isNull();
+ } finally {
+ timer.shutdownService();
+ }
+ }
+
+ /**
+ * Tests that SystemProcessingTimeService#testScheduleAtFixedDelay is
actually triggered
+ * multiple times with the expected scheduled timestamps.
+ */
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ @Test
+ void testScheduleAtFixedDelay() throws Exception {
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final long initialDelay = 10L;
+ final long period = 10L;
+ final long executionDelay = 10L;
+ final int countDown = 3;
+
+ final SystemProcessingTimeService timer =
createSystemProcessingTimeService(errorRef);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(countDown);
+
+ final LastExecutionTimeWrapper lastExecutionTimeWrapper = new
LastExecutionTimeWrapper();
+
+ try {
+ final long initialTimestamp = timer.getCurrentProcessingTime() +
initialDelay;
+ timer.scheduleWithFixedDelay(
+ timestamp -> {
+ try {
+ if (countDownLatch.getCount() == countDown) {
+ assertThat(timestamp)
+ .isCloseTo(initialTimestamp,
Offset.offset(period));
+ } else {
+ assertThat(timestamp)
+ .isCloseTo(
+ lastExecutionTimeWrapper.ts +
period,
+ Offset.offset(period));
+ }
+ Thread.sleep(executionDelay);
+ lastExecutionTimeWrapper.ts =
timer.getCurrentProcessingTime();
+ } catch (Error e) {
+ System.out.println(e.getMessage());
+ throw new Error(e);
+ }
+ countDownLatch.countDown();
+ },
+ initialDelay,
+ period);
countDownLatch.await();
@@ -67,6 +139,10 @@ class SystemProcessingTimeServiceTest {
}
}
+ private static class LastExecutionTimeWrapper {
+ private long ts;
+ }
+
/**
* Tests that shutting down the SystemProcessingTimeService will also
cancel the scheduled at
* fix rate future.