This is an automated email from the ASF dual-hosted git repository.

lihaosky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4acf53ae18e [FLINK-34921][tests] Stop SystemProcessingTimeServiceTest 
hanging on timing jitter (#28405)
4acf53ae18e is described below

commit 4acf53ae18e43055029864072f220087b32ac0e5
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 13 20:56:34 2026 +0200

    [FLINK-34921][tests] Stop SystemProcessingTimeServiceTest hanging on timing 
jitter (#28405)
    
    testScheduleAtFixedRate compared scheduled timestamps against a clock
    read that races the service's internal scheduling base; the failing
    assertion threw an Error out of the callback, which silently stops a
    periodic task, so the latch never counted down and the test hung into a
    bare TimeoutException. Anchor the check on the first observed timestamp,
    record callback throwables and still count down so drift fails fast, and
    drop the fixed-delay first-execution check, which has no non-racy
    reference point.
    
    Generated-by: Claude Opus 4.8 (1M context)
---
 .../tasks/SystemProcessingTimeServiceTest.java     | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index f11ffa65bc9..f3833d1cbe0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -59,20 +59,24 @@ class SystemProcessingTimeServiceTest {
 
         final CountDownLatch countDownLatch = new CountDownLatch(countDown);
 
+        // The service captures its scheduling base internally; a separate 
clock read in the test
+        // races it, so the expected timestamps are anchored on the first 
observed one.
+        final AtomicReference<Long> firstTimestamp = new AtomicReference<>();
+
         try {
-            final long initialTimestamp = timer.getCurrentProcessingTime() + 
initialDelay;
             timer.scheduleAtFixedRate(
                     timestamp -> {
                         try {
+                            firstTimestamp.compareAndSet(null, timestamp);
                             long executionTimes = countDown - 
countDownLatch.getCount();
                             assertThat(timestamp)
                                     .isCloseTo(
-                                            initialTimestamp + executionTimes 
* period,
+                                            firstTimestamp.get() + 
executionTimes * period,
                                             Offset.offset(period));
                             Thread.sleep(executionDelay);
-                        } catch (Error e) {
-                            System.out.println(e.getMessage());
-                            throw new Error(e);
+                        } catch (Throwable e) {
+                            // errorRef is shared with the service's async 
exception handler.
+                            errorRef.compareAndSet(null, e);
                         }
                         countDownLatch.countDown();
                     },
@@ -107,14 +111,12 @@ class SystemProcessingTimeServiceTest {
         final LastExecutionTimeWrapper lastExecutionTimeWrapper = new 
LastExecutionTimeWrapper();
 
         try {
-            final long initialTimestamp = timer.getCurrentProcessingTime() + 
initialDelay;
             timer.scheduleWithFixedDelay(
                     timestamp -> {
                         try {
-                            if (countDownLatch.getCount() == countDown) {
-                                assertThat(timestamp)
-                                        .isCloseTo(initialTimestamp, 
Offset.offset(period));
-                            } else {
+                            // The first execution has no non-racy reference 
point; only later
+                            // executions can verify the fixed-delay spacing.
+                            if (countDownLatch.getCount() != countDown) {
                                 assertThat(timestamp)
                                         .isCloseTo(
                                                 lastExecutionTimeWrapper.ts + 
period,
@@ -122,9 +124,9 @@ class SystemProcessingTimeServiceTest {
                             }
                             Thread.sleep(executionDelay);
                             lastExecutionTimeWrapper.ts = 
timer.getCurrentProcessingTime();
-                        } catch (Error e) {
-                            System.out.println(e.getMessage());
-                            throw new Error(e);
+                        } catch (Throwable e) {
+                            // errorRef is shared with the service's async 
exception handler.
+                            errorRef.compareAndSet(null, e);
                         }
                         countDownLatch.countDown();
                     },

Reply via email to