This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45a2e04e4140e8d4789246b0c9e874f5c850cf89
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Aug 14 12:57:02 2024 +0200

    [FLINK-35886][source] Do not track already finished splits in watermark 
alignment
---
 .../connector/source/mocks/MockSourceReader.java   | 14 +++++++++
 .../streaming/api/operators/SourceOperator.java    |  5 ++++
 .../source/ProgressiveTimestampsAndWatermarks.java |  1 +
 .../operators/source/TimestampsAndWatermarks.java  |  3 ++
 .../operators/source/WatermarkToDataOutput.java    |  3 ++
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 33 ++++++++++++++++++++++
 6 files changed, 59 insertions(+)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index facfcf346cd..b85a2e075bd 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -92,6 +93,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
     @Override
     public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws 
Exception {
+        releaseFinishedSplits(sourceOutput);
 
         if (waitingForSplitsBehaviour == WaitingForSplits.WAIT_FOR_INITIAL
                 && splitsAssignmentState == 
SplitsAssignmentState.NO_SPLITS_ASSIGNED) {
@@ -141,6 +143,18 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
         }
     }
 
+    private void releaseFinishedSplits(ReaderOutput<Integer> sourceOutput) {
+        Iterator<MockSourceSplit> assignedSplitsIterator = 
assignedSplits.iterator();
+        while (assignedSplitsIterator.hasNext()) {
+            MockSourceSplit assignedSplit = assignedSplitsIterator.next();
+            if (assignedSplit.isFinished()) {
+                sourceOutput.releaseOutputForSplit(assignedSplit.splitId());
+                assignedSplitsIterator.remove();
+                pausedSplits.remove(assignedSplit.splitId());
+            }
+        }
+    }
+
     @Override
     public List<MockSourceSplit> snapshotState(long checkpointId) {
         return assignedSplits;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b26edc77c63..ea6e39f4e66 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -663,6 +663,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
     }
 
+    @Override
+    public void splitFinished(String splitId) {
+        splitCurrentWatermarks.remove(splitId);
+    }
+
     /**
      * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
      * splits that have been paused and where the global watermark caught up 
are resumed.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index ca0c5f47b9b..5b96c5dc0c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -304,6 +304,7 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
         }
 
         void releaseOutputForSplit(String splitId) {
+            watermarkUpdateListener.splitFinished(splitId);
             localOutputs.remove(splitId);
             watermarkMultiplexer.unregisterOutput(splitId);
             PausableRelativeClock inputActivityClock =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index b6e1a95acaf..91e2cb968dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -62,6 +62,9 @@ public interface TimestampsAndWatermarks<T> {
 
         /** Notifies about changes to per split watermarks. */
         void updateCurrentSplitWatermark(String splitId, long watermark);
+
+        /** Notifies that split has finished. */
+        void splitFinished(String splitId);
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
index d4d81b64f45..4fcf46ca9f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -53,6 +53,9 @@ public final class WatermarkToDataOutput implements 
WatermarkOutput {
 
                     @Override
                     public void updateCurrentSplitWatermark(String splitId, 
long watermark) {}
+
+                    @Override
+                    public void splitFinished(String splitId) {}
                 });
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 1a199b23123..93c7072842e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
+import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -222,6 +223,38 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         assertThat(dataOutput.getEvents()).doNotHave(new 
WatermarkAbove(maxEmittedWatermark));
     }
 
+    @Test
+    void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception {
+        long idleTimeout = 100;
+        MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
+        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        MockSourceSplit split0 = new MockSourceSplit(0, 0, 1);
+        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
+        int maxAllowedWatermark = 4;
+        int maxEmittedWatermark = maxAllowedWatermark + 1;
+        // the intention is that only first record from split0 gets emitted, 
then split0 gets
+        // blocked and record (maxEmittedWatermark + 100) is never emitted 
from split0
+        split0.addRecord(maxEmittedWatermark);
+        split1.addRecord(3);
+
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(split0, split1), new 
MockSourceSplitSerializer()));
+        CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+
+        while (operator.emitNext(dataOutput) == 
DataInputStatus.MORE_AVAILABLE) {
+            // split0 emits its only record and is finished/released
+        }
+        operator.handleOperatorEvent(
+                new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks 
split0
+        assertThat(sourceReader.getPausedSplits()).isEmpty();
+    }
+
     private SourceOperator<Integer, MockSourceSplit> 
createAndOpenSourceOperatorWithIdleness(
             MockSourceReader sourceReader,
             TestProcessingTimeService processingTimeService,

Reply via email to