This is an automated email from the ASF dual-hosted git repository.
lihaosky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4acf53ae18e [FLINK-34921][tests] Stop SystemProcessingTimeServiceTest
hanging on timing jitter (#28405)
4acf53ae18e is described below
commit 4acf53ae18e43055029864072f220087b32ac0e5
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 13 20:56:34 2026 +0200
[FLINK-34921][tests] Stop SystemProcessingTimeServiceTest hanging on timing
jitter (#28405)
testScheduleAtFixedRate compared scheduled timestamps against a clock
read that races the service's internal scheduling base; the failing
assertion threw an Error out of the callback, which silently stops a
periodic task, so the latch never counted down and the test hung into a
bare TimeoutException. Anchor the check on the first observed timestamp,
record callback throwables and still count down so drift fails fast, and
drop the fixed-delay first-execution check, which has no non-racy
reference point.
Generated-by: Claude Opus 4.8 (1M context)
---
.../tasks/SystemProcessingTimeServiceTest.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index f11ffa65bc9..f3833d1cbe0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -59,20 +59,24 @@ class SystemProcessingTimeServiceTest {
final CountDownLatch countDownLatch = new CountDownLatch(countDown);
+ // The service captures its scheduling base internally; a separate
clock read in the test
+ // races it, so the expected timestamps are anchored on the first
observed one.
+ final AtomicReference<Long> firstTimestamp = new AtomicReference<>();
+
try {
- final long initialTimestamp = timer.getCurrentProcessingTime() +
initialDelay;
timer.scheduleAtFixedRate(
timestamp -> {
try {
+ firstTimestamp.compareAndSet(null, timestamp);
long executionTimes = countDown -
countDownLatch.getCount();
assertThat(timestamp)
.isCloseTo(
- initialTimestamp + executionTimes
* period,
+ firstTimestamp.get() +
executionTimes * period,
Offset.offset(period));
Thread.sleep(executionDelay);
- } catch (Error e) {
- System.out.println(e.getMessage());
- throw new Error(e);
+ } catch (Throwable e) {
+ // errorRef is shared with the service's async
exception handler.
+ errorRef.compareAndSet(null, e);
}
countDownLatch.countDown();
},
@@ -107,14 +111,12 @@ class SystemProcessingTimeServiceTest {
final LastExecutionTimeWrapper lastExecutionTimeWrapper = new
LastExecutionTimeWrapper();
try {
- final long initialTimestamp = timer.getCurrentProcessingTime() +
initialDelay;
timer.scheduleWithFixedDelay(
timestamp -> {
try {
- if (countDownLatch.getCount() == countDown) {
- assertThat(timestamp)
- .isCloseTo(initialTimestamp,
Offset.offset(period));
- } else {
+ // The first execution has no non-racy reference
point; only later
+ // executions can verify the fixed-delay spacing.
+ if (countDownLatch.getCount() != countDown) {
assertThat(timestamp)
.isCloseTo(
lastExecutionTimeWrapper.ts +
period,
@@ -122,9 +124,9 @@ class SystemProcessingTimeServiceTest {
}
Thread.sleep(executionDelay);
lastExecutionTimeWrapper.ts =
timer.getCurrentProcessingTime();
- } catch (Error e) {
- System.out.println(e.getMessage());
- throw new Error(e);
+ } catch (Throwable e) {
+ // errorRef is shared with the service's async
exception handler.
+ errorRef.compareAndSet(null, e);
}
countDownLatch.countDown();
},