This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62750753ad40b5bdb8afa6cddd5612054f806a62 Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Aug 14 12:57:02 2024 +0200 [FLINK-35886][source] Do not track already finished splits in watermark alignment --- .../connector/source/mocks/MockSourceReader.java | 14 +++++++++ .../streaming/api/operators/SourceOperator.java | 5 ++++ .../source/ProgressiveTimestampsAndWatermarks.java | 1 + .../operators/source/TimestampsAndWatermarks.java | 3 ++ .../operators/source/WatermarkToDataOutput.java | 3 ++ .../SourceOperatorSplitWatermarkAlignmentTest.java | 33 ++++++++++++++++++++++ 6 files changed, 59 insertions(+) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index facfcf346cd..b85a2e075bd 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -92,6 +93,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> @Override public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception { + releaseFinishedSplits(sourceOutput); if (waitingForSplitsBehaviour == WaitingForSplits.WAIT_FOR_INITIAL && splitsAssignmentState == SplitsAssignmentState.NO_SPLITS_ASSIGNED) { @@ -141,6 +143,18 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } } + private void releaseFinishedSplits(ReaderOutput<Integer> sourceOutput) { + Iterator<MockSourceSplit> assignedSplitsIterator = assignedSplits.iterator(); + while (assignedSplitsIterator.hasNext()) { + MockSourceSplit assignedSplit = assignedSplitsIterator.next(); + if (assignedSplit.isFinished()) { + sourceOutput.releaseOutputForSplit(assignedSplit.splitId()); + assignedSplitsIterator.remove(); + pausedSplits.remove(assignedSplit.splitId()); + } + } + } + @Override public List<MockSourceSplit> snapshotState(long checkpointId) { return assignedSplits; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index d1257e2caf5..b915ab5ba88 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -664,6 +664,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } } + @Override + public void splitFinished(String splitId) { + splitCurrentWatermarks.remove(splitId); + } + /** * Finds the splits that are beyond the current max watermark and pauses them. At the same time, * splits that have been paused and where the global watermark caught up are resumed. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java index ca0c5f47b9b..5b96c5dc0c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java @@ -304,6 +304,7 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater } void releaseOutputForSplit(String splitId) { + watermarkUpdateListener.splitFinished(splitId); localOutputs.remove(splitId); watermarkMultiplexer.unregisterOutput(splitId); PausableRelativeClock inputActivityClock = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java index bdbd3479da5..4d96e53e2e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java @@ -62,6 +62,9 @@ public interface TimestampsAndWatermarks<T> { /** Notifies about changes to per split watermarks. */ void updateCurrentSplitWatermark(String splitId, long watermark); + + /** Notifies that split has finished. */ + void splitFinished(String splitId); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java index d4d81b64f45..4fcf46ca9f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java @@ -53,6 +53,9 @@ public final class WatermarkToDataOutput implements WatermarkOutput { @Override public void updateCurrentSplitWatermark(String splitId, long watermark) {} + + @Override + public void splitFinished(String splitId) {} }); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index e776395d31a..3031801b07c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; +import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -222,6 +223,38 @@ class SourceOperatorSplitWatermarkAlignmentTest { assertThat(dataOutput.getEvents()).doNotHave(new WatermarkAbove(maxEmittedWatermark)); } + @Test + void testSplitWatermarkAlignmentWithFinishedSplit() throws Exception { + long idleTimeout = 100; + MockSourceReader sourceReader = + new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false, true); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + SourceOperator<Integer, MockSourceSplit> operator = + createAndOpenSourceOperatorWithIdleness( + sourceReader, processingTimeService, idleTimeout); + + MockSourceSplit split0 = new MockSourceSplit(0, 0, 1); + MockSourceSplit split1 = new MockSourceSplit(1, 10, 20); + int maxAllowedWatermark = 4; + int maxEmittedWatermark = maxAllowedWatermark + 1; + // the intention is that only first record from split0 gets emitted, then split0 gets + // blocked and record (maxEmittedWatermark + 100) is never emitted from split0 + split0.addRecord(maxEmittedWatermark); + split1.addRecord(3); + + operator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(split0, split1), new MockSourceSplitSerializer())); + CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); + + while (operator.emitNext(dataOutput) == DataInputStatus.MORE_AVAILABLE) { + // split0 emits its only record and is finished/released + } + operator.handleOperatorEvent( + new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks split0 + assertThat(sourceReader.getPausedSplits()).isEmpty(); + } + private SourceOperator<Integer, MockSourceSplit> createAndOpenSourceOperatorWithIdleness( MockSourceReader sourceReader, TestProcessingTimeService processingTimeService,
