This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7536cfe9e47336426882df288d3a3a3bd08cccbc
Author: Efrat Levitan <[email protected]>
AuthorDate: Sun Feb 22 15:33:24 2026 +0200

    [FLINK-39073][runtime] Test split state timers during deferred alignment 
check
---
 .../metrics/groups/InternalSourceSplitMetricGroup.java  | 13 ++++++-------
 .../flink/streaming/api/operators/SourceOperator.java   |  1 +
 .../SourceOperatorSplitWatermarkAlignmentTest.java      | 17 +++++++++++++++++
 3 files changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
index f7efb1b8772..bc522d228b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -77,12 +77,6 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
                         MetricNames.SPLIT_CURRENT_WATERMARK, currentWatermark);
     }
 
-    public static InternalSourceSplitMetricGroup wrap(
-            OperatorMetricGroup operatorMetricGroup, String splitId, 
Gauge<Long> currentWatermark) {
-        return new InternalSourceSplitMetricGroup(
-                operatorMetricGroup, SystemClock.getInstance(), splitId, 
currentWatermark);
-    }
-
     @VisibleForTesting
     public static InternalSourceSplitMetricGroup mock(
             MetricGroup metricGroup, String splitId, Gauge<Long> 
currentWatermark) {
@@ -90,7 +84,6 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
                 metricGroup, SystemClock.getInstance(), splitId, 
currentWatermark);
     }
 
-    @VisibleForTesting
     public static InternalSourceSplitMetricGroup wrap(
             OperatorMetricGroup operatorMetricGroup,
             Clock clock,
@@ -210,4 +203,10 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
     public MetricGroup getSplitWatermarkMetricGroup() {
         return splitWatermarkMetricGroup;
     }
+
+    @VisibleForTesting
+    public void updateTimers() {
+        this.idleTimePerSecond.update();
+        this.pausedTimePerSecond.update();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 02273b20dd5..433fdfd3110 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -396,6 +396,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             InternalSourceSplitMetricGroup splitMetricGroup =
                     InternalSourceSplitMetricGroup.wrap(
                             getMetricGroup(),
+                            processingTimeService.getClock(),
                             splitId,
                             () -> 
sampledSplitWatermarks.get(splitId).getLatest());
             splitMetricGroup.markSplitStart();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 904b88e79fc..b14639a9fcc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -73,6 +73,8 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 /** Unit test for split alignment in {@link SourceOperator}. */
 class SourceOperatorSplitWatermarkAlignmentTest {
@@ -507,6 +509,8 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         final MockSourceReader sourceReader =
                 new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
true, true);
         final TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        // Split states math assumes non-negative time
+        processingTimeService.setCurrentTime(0);
         final SourceOperator<Integer, MockSourceSplit> operator =
                 createAndOpenSourceOperatorWithIdleness(
                         sourceReader, processingTimeService, idleTimeout);
@@ -544,12 +548,25 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         // The split is still idle:
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
 
+        // updating timers values manually (in reality this is done by 
ViewUpdater)
+        operator.getSplitMetricGroup(split0.splitId()).updateTimers();
+        // Ensure the idle timer ticked, but not pause timer
+        assertNotEquals(
+                0L, 
operator.getSplitMetricGroup(split0.splitId()).getAccumulatedIdleTime());
+        assertEquals(0L, 
operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime());
+
         // The split emits a record to break out of idleness
         operator.emitNext(actualOutput);
         sampleAllWatermarks(processingTimeService);
 
         // The split is marked not idle, then immediately paused by the 
deferred alignment check
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+
+        // Make pause timer tick
+        processingTimeService.advance(10);
+        operator.getSplitMetricGroup(split0.splitId()).updateTimers();
+        assertNotEquals(
+                0L, 
operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime());
     }
 
     private void sampleAllWatermarks(TestProcessingTimeService timeService) 
throws Exception {

Reply via email to