This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a186fdc2c3bceb374a806907c94d267920e11ed2
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Fri Nov 20 00:36:39 2020 +0800

    [FLINK-20194] Change SourceReaderBase.onSplitFinished() to take a map of 
SplitId -> SplitState.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java      | 8 ++++----
 .../flink/connector/base/source/reader/SourceReaderBaseTest.java  | 4 ++--
 .../connector/base/source/reader/mocks/MockSourceReader.java      | 4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 430f629..dfbacec 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -172,11 +171,12 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                final Set<String> finishedSplits = fetch.finishedSplits();
                if (!finishedSplits.isEmpty()) {
                        LOG.info("Finished reading split(s) {}", 
finishedSplits);
+                       Map<String, SplitStateT> stateOfFinishedSplits = new 
HashMap<>();
                        for (String finishedSplitId : finishedSplits) {
-                               splitStates.remove(finishedSplitId);
+                               stateOfFinishedSplits.put(finishedSplitId, 
splitStates.remove(finishedSplitId).state);
                                output.releaseOutputForSplit(finishedSplitId);
                        }
-                       onSplitFinished(finishedSplits);
+                       onSplitFinished(stateOfFinishedSplits);
                }
 
                fetch.recycle();
@@ -240,7 +240,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        /**
         * Handles the finished splits to clean the state if needed.
         */
-       protected abstract void onSplitFinished(Collection<String> 
finishedSplitIds);
+       protected abstract void onSplitFinished(Map<String, SplitStateT> 
finishedSplitIds);
 
        /**
         * When new splits are added to the reader. The initialize the state of 
the new splits.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 764b203..eface7b 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -43,9 +43,9 @@ import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
@@ -295,7 +295,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        public void notifyCheckpointComplete(long checkpointId) 
{}
 
                        @Override
-                       protected void onSplitFinished(Collection<String> 
finishedSplitIds) {}
+                       protected void onSplitFinished(Map<String, 
TestingSourceSplit> finishedSplitIds) {}
 
                        @Override
                        protected TestingSourceSplit 
initializedState(TestingSourceSplit split) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 3793705..42fc578 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
-import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -52,7 +52,7 @@ public class MockSourceReader
        }
 
        @Override
-       protected void onSplitFinished(Collection<String> finishedSplitIds) {
+       protected void onSplitFinished(Map<String, AtomicInteger> 
finishedSplitIds) {
 
        }
 

Reply via email to