This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e6de2f6daea5d5c5660ade87a190737bbea8f3c7 Author: Efrat Levitan <[email protected]> AuthorDate: Thu Feb 19 16:08:35 2026 +0200 [FLINK-39073][runtime] Defer alignment check for idle splits If a split emits watermarks far into the future and then goes idle, alignment check will incorrectly mark it paused. As max allowed watermark advances, Source operator will transition the split back to active state, (while its still idle) This change aims to fix the issue by deferring the alignment check for idle splits until they break out of idleness. --- .../streaming/api/operators/SourceOperator.java | 31 +++++++++++-- .../SourceOperatorSplitWatermarkAlignmentTest.java | 51 ++++++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 12723254ffc..02273b20dd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -178,6 +178,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr private final List<SplitT> splitsToInitializeOutput = new ArrayList<>(); private final Set<String> currentlyPausedSplits = new HashSet<>(); + private final Set<String> currentlyIdleSplits = new HashSet<>(); private boolean waitingForCheckpoint; @@ -781,6 +782,13 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr public void updateCurrentSplitWatermark(String splitId, long watermark) { WatermarkSampler splitWatermarkSampler = checkNotNull(sampledSplitWatermarks.get(splitId)); splitWatermarkSampler.addLatest(watermark); + if (!currentlyIdleSplits.contains(splitId)) { + maybePauseSplit(splitId); + } + } + + private void maybePauseSplit(String splitId) { + WatermarkSampler splitWatermarkSampler = checkNotNull(sampledSplitWatermarks.get(splitId)); long oldestSampledWatermark = splitWatermarkSampler.getOldestSample(); // oldestSampledWatermark can be only updated after adding new latest if sampling capacity // is 0, but we still need to handle that @@ -793,10 +801,22 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr @Override public void updateCurrentSplitIdle(String splitId, boolean idle) { + final InternalSourceSplitMetricGroup splitMetricGroup = + this.getOrCreateSplitMetricGroup(splitId); + if (idle == currentlyIdleSplits.contains(splitId)) { + return; + } if (idle) { - this.getOrCreateSplitMetricGroup(splitId).markIdle(); + LOG.info("[{}] Marking split idle", splitId); + currentlyIdleSplits.add(splitId); + splitMetricGroup.markIdle(); } else { - this.getOrCreateSplitMetricGroup(splitId).markNotIdle(); + LOG.info("[{}] Marking split not idle", splitId); + currentlyIdleSplits.remove(splitId); + splitMetricGroup.markNotIdle(); + // Since we skipped alignment check + // for this split while it was idle: + maybePauseSplit(splitId); } } @@ -805,6 +825,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr getOrCreateSplitMetricGroup(splitId).onSplitFinished(); this.splitMetricGroups.remove(splitId); sampledSplitWatermarks.remove(splitId); + currentlyIdleSplits.remove(splitId); } /** @@ -818,6 +839,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr Collection<String> splitsToResume = new ArrayList<>(); sampledSplitWatermarks.forEach( (splitId, splitWatermarks) -> { + if (currentlyIdleSplits.contains(splitId)) { + return; + } if (splitWatermarks.getOldestSample() > currentMaxDesiredWatermark) { splitsToPause.add(splitId); } else if (currentlyPausedSplits.contains(splitId)) { @@ -836,10 +860,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr Collection<String> splitsToPause, Collection<String> splitsToResume) { try { LOG.info( - "pauseOrResumeSplits [splitsToPause={}][splitsToResume={}]" + "pauseOrResumeSplits [splitsToPause={}][splitsToResume={}][idleSplits={}]" + "[currentMaxDesiredWatermark={}][latestWatermark={}][oldestWatermark={}]", splitsToPause, splitsToResume, + currentlyIdleSplits, currentMaxDesiredWatermark, sampledLatestWatermark.getLatest(), sampledLatestWatermark.getOldestSample()); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 742a8bc6d43..904b88e79fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -501,6 +501,57 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue(); } + @Test + void testAlignmentCheckIsDeferredForIdleSplits() throws Exception { + final long idleTimeout = 100; + final MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, true, true); + final TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + final SourceOperator<Integer, MockSourceSplit> operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10); + final int allowedWatermark4 = 4; + final int allowedWatermark7 = 7; + split0.addRecord(5); + split0.addRecord(6); + split0.addRecord(7); + split0.addRecord(8); + operator.handleOperatorEvent( + new AddSplitEvent<>(Arrays.asList(split0), new MockSourceSplitSerializer())); + final CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); + + // Emit enough record to fill the sampler buffer + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + operator.emitNext(actualOutput); + sampleAllWatermarks(processingTimeService); + + // Transition the split to idle state: + for (int i = 0; i < 10; i++) { + processingTimeService.advance(idleTimeout); + } + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // Alignment check fires but doesn't pause the idle split + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark4)); + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // While the split is idle, we advance the allowed watermark to keep the source active + operator.handleOperatorEvent(new WatermarkAlignmentEvent(allowedWatermark7)); + sampleAllWatermarks(processingTimeService); + // The split is still idle: + assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue(); + + // The split emits a record to break out of idleness + operator.emitNext(actualOutput); + sampleAllWatermarks(processingTimeService); + + // The split is marked not idle, then immediately paused by the deferred alignment check + assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue(); + } + private void sampleAllWatermarks(TestProcessingTimeService timeService) throws Exception { sampleWatermarks(timeService, WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue()); }
