This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6de2f6daea5d5c5660ade87a190737bbea8f3c7
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Feb 19 16:08:35 2026 +0200

    [FLINK-39073][runtime] Defer alignment check for idle splits
    
    If a split emits watermarks far into the future and then goes idle, 
alignment check will incorrectly mark it paused.
    As max allowed watermark advances, Source operator will transition the 
split back to active state, (while its still idle)
    This change aims to fix the issue by deferring the alignment check for idle 
splits until they break out of idleness.
---
 .../streaming/api/operators/SourceOperator.java    | 31 +++++++++++--
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 51 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 12723254ffc..02273b20dd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -178,6 +178,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     private final List<SplitT> splitsToInitializeOutput = new ArrayList<>();
 
     private final Set<String> currentlyPausedSplits = new HashSet<>();
+    private final Set<String> currentlyIdleSplits = new HashSet<>();
 
     private boolean waitingForCheckpoint;
 
@@ -781,6 +782,13 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     public void updateCurrentSplitWatermark(String splitId, long watermark) {
         WatermarkSampler splitWatermarkSampler = 
checkNotNull(sampledSplitWatermarks.get(splitId));
         splitWatermarkSampler.addLatest(watermark);
+        if (!currentlyIdleSplits.contains(splitId)) {
+            maybePauseSplit(splitId);
+        }
+    }
+
+    private void maybePauseSplit(String splitId) {
+        WatermarkSampler splitWatermarkSampler = 
checkNotNull(sampledSplitWatermarks.get(splitId));
         long oldestSampledWatermark = splitWatermarkSampler.getOldestSample();
         // oldestSampledWatermark can be only updated after adding new latest 
if sampling capacity
         // is 0, but we still need to handle that
@@ -793,10 +801,22 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void updateCurrentSplitIdle(String splitId, boolean idle) {
+        final InternalSourceSplitMetricGroup splitMetricGroup =
+                this.getOrCreateSplitMetricGroup(splitId);
+        if (idle == currentlyIdleSplits.contains(splitId)) {
+            return;
+        }
         if (idle) {
-            this.getOrCreateSplitMetricGroup(splitId).markIdle();
+            LOG.info("[{}] Marking split idle", splitId);
+            currentlyIdleSplits.add(splitId);
+            splitMetricGroup.markIdle();
         } else {
-            this.getOrCreateSplitMetricGroup(splitId).markNotIdle();
+            LOG.info("[{}] Marking split not idle", splitId);
+            currentlyIdleSplits.remove(splitId);
+            splitMetricGroup.markNotIdle();
+            // Since we skipped alignment check
+            // for this split while it was idle:
+            maybePauseSplit(splitId);
         }
     }
 
@@ -805,6 +825,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         getOrCreateSplitMetricGroup(splitId).onSplitFinished();
         this.splitMetricGroups.remove(splitId);
         sampledSplitWatermarks.remove(splitId);
+        currentlyIdleSplits.remove(splitId);
     }
 
     /**
@@ -818,6 +839,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         Collection<String> splitsToResume = new ArrayList<>();
         sampledSplitWatermarks.forEach(
                 (splitId, splitWatermarks) -> {
+                    if (currentlyIdleSplits.contains(splitId)) {
+                        return;
+                    }
                     if (splitWatermarks.getOldestSample() > 
currentMaxDesiredWatermark) {
                         splitsToPause.add(splitId);
                     } else if (currentlyPausedSplits.contains(splitId)) {
@@ -836,10 +860,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
         try {
             LOG.info(
-                    "pauseOrResumeSplits [splitsToPause={}][splitsToResume={}]"
+                    "pauseOrResumeSplits 
[splitsToPause={}][splitsToResume={}][idleSplits={}]"
                             + 
"[currentMaxDesiredWatermark={}][latestWatermark={}][oldestWatermark={}]",
                     splitsToPause,
                     splitsToResume,
+                    currentlyIdleSplits,
                     currentMaxDesiredWatermark,
                     sampledLatestWatermark.getLatest(),
                     sampledLatestWatermark.getOldestSample());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 742a8bc6d43..904b88e79fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -501,6 +501,57 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
     }
 
+    @Test
+    void testAlignmentCheckIsDeferredForIdleSplits() throws Exception {
+        final long idleTimeout = 100;
+        final MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
true, true);
+        final TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        final SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+        final int allowedWatermark4 = 4;
+        final int allowedWatermark7 = 7;
+        split0.addRecord(5);
+        split0.addRecord(6);
+        split0.addRecord(7);
+        split0.addRecord(8);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(Arrays.asList(split0), new 
MockSourceSplitSerializer()));
+        final CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
+
+        // Emit enough record to fill the sampler buffer
+        operator.emitNext(actualOutput);
+        operator.emitNext(actualOutput);
+        operator.emitNext(actualOutput);
+        sampleAllWatermarks(processingTimeService);
+
+        // Transition the split to idle state:
+        for (int i = 0; i < 10; i++) {
+            processingTimeService.advance(idleTimeout);
+        }
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // Alignment check fires but doesn't pause the idle split
+        operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(allowedWatermark4));
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // While the split is idle, we advance the allowed watermark to keep 
the source active
+        operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(allowedWatermark7));
+        sampleAllWatermarks(processingTimeService);
+        // The split is still idle:
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+        // The split emits a record to break out of idleness
+        operator.emitNext(actualOutput);
+        sampleAllWatermarks(processingTimeService);
+
+        // The split is marked not idle, then immediately paused by the 
deferred alignment check
+        
assertThat(operator.getSplitMetricGroup(split0.splitId()).isPaused()).isTrue();
+    }
+
     private void sampleAllWatermarks(TestProcessingTimeService timeService) 
throws Exception {
         sampleWatermarks(timeService, 
WATERMARK_ALIGNMENT_BUFFER_SIZE.defaultValue());
     }

Reply via email to