This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7536cfe9e47336426882df288d3a3a3bd08cccbc Author: Efrat Levitan <[email protected]> AuthorDate: Sun Feb 22 15:33:24 2026 +0200 [FLINK-39073][runtime] Test split state timers during deferred alignment check --- .../metrics/groups/InternalSourceSplitMetricGroup.java | 13 ++++++------- .../flink/streaming/api/operators/SourceOperator.java | 1 + .../SourceOperatorSplitWatermarkAlignmentTest.java | 17 +++++++++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java index f7efb1b8772..bc522d228b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java @@ -77,12 +77,6 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup MetricNames.SPLIT_CURRENT_WATERMARK, currentWatermark); } - public static InternalSourceSplitMetricGroup wrap( - OperatorMetricGroup operatorMetricGroup, String splitId, Gauge<Long> currentWatermark) { - return new InternalSourceSplitMetricGroup( - operatorMetricGroup, SystemClock.getInstance(), splitId, currentWatermark); - } - @VisibleForTesting public static InternalSourceSplitMetricGroup mock( MetricGroup metricGroup, String splitId, Gauge<Long> currentWatermark) { @@ -90,7 +84,6 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup metricGroup, SystemClock.getInstance(), splitId, currentWatermark); } - @VisibleForTesting public static InternalSourceSplitMetricGroup wrap( OperatorMetricGroup operatorMetricGroup, Clock clock, @@ -210,4 +203,10 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup public MetricGroup getSplitWatermarkMetricGroup() { return splitWatermarkMetricGroup; } + + @VisibleForTesting + public void updateTimers() { + this.idleTimePerSecond.update(); + this.pausedTimePerSecond.update(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 02273b20dd5..433fdfd3110 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -396,6 +396,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr InternalSourceSplitMetricGroup splitMetricGroup = InternalSourceSplitMetricGroup.wrap( getMetricGroup(), + processingTimeService.getClock(), splitId, () -> sampledSplitWatermarks.get(splitId).getLatest()); splitMetricGroup.markSplitStart(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 904b88e79fc..b14639a9fcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -73,6 +73,8 @@ import java.util.stream.Collectors; import static org.apache.flink.configuration.PipelineOptions.WATERMARK_ALIGNMENT_BUFFER_SIZE; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; /** Unit test for split alignment in {@link SourceOperator}. */ class SourceOperatorSplitWatermarkAlignmentTest { @@ -507,6 +509,8 @@ class SourceOperatorSplitWatermarkAlignmentTest { final MockSourceReader sourceReader = new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true); final TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + // Split states math assumes non-negative time + processingTimeService.setCurrentTime(0); final SourceOperator<Integer, MockSourceSplit> operator = createAndOpenSourceOperatorWithIdleness( sourceReader, processingTimeService, idleTimeout); @@ -544,12 +548,25 @@ class SourceOperatorSplitWatermarkAlignmentTest { // The split is still idle: assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + // updating timers values manually (in reality this is done by ViewUpdater) + operator.getSplitMetricGroup(split0.splitId()).updateTimers(); + // Ensure the idle timer ticked, but not pause timer + assertNotEquals( + 0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedIdleTime()); + assertEquals(0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime()); + // The split emits a record to break out of idleness operator.emitNext(actualOutput); sampleAllWatermarks(processingTimeService); // The split is marked not idle, then immediately paused by the deferred alignment check assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); + + // Make pause timer tick + processingTimeService.advance(10); + operator.getSplitMetricGroup(split0.splitId()).updateTimers(); + assertNotEquals( + 0L, operator.getSplitMetricGroup(split0.splitId()).getAccumulatedPausedTime()); } private void sampleAllWatermarks(TestProcessingTimeService timeService) throws Exception {
