This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a186fdc2c3bceb374a806907c94d267920e11ed2 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Fri Nov 20 00:36:39 2020 +0800 [FLINK-20194] Change SourceReaderBase.onSplitFinished() to take a map of SplitId -> SplitState. --- .../flink/connector/base/source/reader/SourceReaderBase.java | 8 ++++---- .../flink/connector/base/source/reader/SourceReaderBaseTest.java | 4 ++-- .../connector/base/source/reader/mocks/MockSourceReader.java | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 430f629..dfbacec 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -172,11 +171,12 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt final Set<String> finishedSplits = fetch.finishedSplits(); if (!finishedSplits.isEmpty()) { LOG.info("Finished reading split(s) {}", finishedSplits); + Map<String, SplitStateT> stateOfFinishedSplits = new HashMap<>(); for (String finishedSplitId : finishedSplits) { - splitStates.remove(finishedSplitId); + stateOfFinishedSplits.put(finishedSplitId, splitStates.remove(finishedSplitId).state); output.releaseOutputForSplit(finishedSplitId); } - onSplitFinished(finishedSplits); + onSplitFinished(stateOfFinishedSplits); } fetch.recycle(); @@ -240,7 +240,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt /** * Handles the finished splits to clean the state if needed. */ - protected abstract void onSplitFinished(Collection<String> finishedSplitIds); + protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds); /** * When new splits are added to the reader. The initialize the state of the new splits. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 764b203..eface7b 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -43,9 +43,9 @@ import org.junit.rules.ExpectedException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -295,7 +295,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> public void notifyCheckpointComplete(long checkpointId) {} @Override - protected void onSplitFinished(Collection<String> finishedSplitIds) {} + protected void onSplitFinished(Map<String, TestingSourceSplit> finishedSplitIds) {} @Override protected TestingSourceSplit initializedState(TestingSourceSplit split) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java index 3793705..42fc578 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java @@ -27,7 +27,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import java.util.Collection; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -52,7 +52,7 @@ public class MockSourceReader } @Override - protected void onSplitFinished(Collection<String> finishedSplitIds) { + protected void onSplitFinished(Map<String, AtomicInteger> finishedSplitIds) { }
