This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe2b269b8bcab8a2344841c09d643cf2ebdcb5be
Author: Kezhu Wang <[email protected]>
AuthorDate: Mon Nov 9 01:59:06 2020 +0800

    [FLINK-19448][connector/common] Fix handling of finished splits and closing 
split fetchers in SourceReaderBase
    
    This closes #13989
---
 .../base/source/reader/SourceReaderBase.java       |  4 +-
 .../source/reader/fetcher/SplitFetcherManager.java |  8 ++-
 .../base/source/reader/SourceReaderBaseTest.java   | 69 ++++++++++++++++++++++
 .../base/source/reader/mocks/MockSplitReader.java  | 57 +++++++++++++++++-
 4 files changed, 134 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 5b4d6f7..503e6c9 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -270,7 +270,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                        splitFetcherManager.checkErrors();
                        return InputStatus.END_OF_INPUT;
                } else {
-                       throw new IllegalStateException("Called 
'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
+                       // We can reach this case if we just processed all data 
from the queue and finished a split,
+                       // and concurrently the fetcher finished another split, 
whose data is then in the queue.
+                       return InputStatus.MORE_AVAILABLE;
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 2b5b331..0d4ddfb 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -133,7 +133,13 @@ public abstract class SplitFetcherManager<E, SplitT 
extends SourceSplit> {
                        elementsQueue,
                        splitReader,
                        errorHandler,
-                       () -> fetchers.remove(fetcherId));
+                       () -> {
+                               fetchers.remove(fetcherId);
+                               // We need this to synchronize status of 
fetchers to concurrent partners as
+                               // ConcurrentHashMap's aggregate status methods 
including size, isEmpty, and
+                               // containsValue are not designed for program 
control.
+                               elementsQueue.notifyAvailable();
+                       });
                fetchers.put(fetcherId, splitFetcher);
                return splitFetcher;
        }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index dc8342f..646d88f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -123,6 +124,74 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                assertTrue(records.isRecycled());
        }
 
+       @Test
+       public void testMultipleSplitsWithDifferentFinishingMoments() throws 
Exception {
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
+                       new FutureCompletingBlockingQueue<>();
+               MockSplitReader mockSplitReader = MockSplitReader.newBuilder()
+                       .setNumRecordsPerSplitPerFetch(2)
+                       .setSeparatedFinishedRecord(false)
+                       .setBlockingFetch(false)
+                       .build();
+               MockSourceReader reader = new MockSourceReader(
+                       elementsQueue,
+                       () -> mockSplitReader,
+                       getConfig(),
+                       null);
+
+               reader.start();
+
+               List<MockSourceSplit> splits = Arrays.asList(
+                       getSplit(0, 10, Boundedness.BOUNDED),
+                       getSplit(1, 12, Boundedness.BOUNDED)
+               );
+               reader.addSplits(splits);
+               reader.handleSourceEvents(new NoMoreSplitsEvent());
+
+               while (true) {
+                       InputStatus status = reader.pollNext(new 
TestingReaderOutput<>());
+                       if (status == InputStatus.END_OF_INPUT) {
+                               break;
+                       } if (status == InputStatus.NOTHING_AVAILABLE) {
+                               reader.isAvailable().get();
+                       }
+               }
+       }
+
+       @Test
+       public void testMultipleSplitsWithSeparatedFinishedRecord() throws 
Exception {
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
+                       new FutureCompletingBlockingQueue<>();
+               MockSplitReader mockSplitReader = MockSplitReader.newBuilder()
+                       .setNumRecordsPerSplitPerFetch(2)
+                       .setSeparatedFinishedRecord(true)
+                       .setBlockingFetch(false)
+                       .build();
+               MockSourceReader reader = new MockSourceReader(
+                       elementsQueue,
+                       () -> mockSplitReader,
+                       getConfig(),
+                       null);
+
+               reader.start();
+
+               List<MockSourceSplit> splits = Arrays.asList(
+                       getSplit(0, 10, Boundedness.BOUNDED),
+                       getSplit(1, 10, Boundedness.BOUNDED)
+               );
+               reader.addSplits(splits);
+               reader.handleSourceEvents(new NoMoreSplitsEvent());
+
+               while (true) {
+                       InputStatus status = reader.pollNext(new 
TestingReaderOutput<>());
+                       if (status == InputStatus.END_OF_INPUT) {
+                               break;
+                       } if (status == InputStatus.NOTHING_AVAILABLE) {
+                               reader.isAvailable().get();
+                       }
+               }
+       }
+
        // ---------------- helper methods -----------------
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index e991b9c..f4d0a2f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -40,6 +41,7 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        // Use LinkedHashMap for determinism.
        private final Map<String, MockSourceSplit> splits = new 
LinkedHashMap<>();
        private final int numRecordsPerSplitPerFetch;
+       private final boolean separatedFinishedRecord;
        private final boolean blockingFetch;
 
        private final Object wakeupLock = new Object();
@@ -49,7 +51,15 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        public MockSplitReader(
                        int numRecordsPerSplitPerFetch,
                        boolean blockingFetch) {
+               this(numRecordsPerSplitPerFetch, false, blockingFetch);
+       }
+
+       private MockSplitReader(
+               int numRecordsPerSplitPerFetch,
+               boolean separatedFinishedRecord,
+               boolean blockingFetch) {
                this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
+               this.separatedFinishedRecord = separatedFinishedRecord;
                this.blockingFetch = blockingFetch;
        }
 
@@ -93,17 +103,28 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
                }
 
                try {
-                       for (Map.Entry<String, MockSourceSplit> entry : 
splits.entrySet()) {
+                       Iterator<Map.Entry<String, MockSourceSplit>> iterator = 
splits.entrySet().iterator();
+                       while (iterator.hasNext()) {
+                               Map.Entry<String, MockSourceSplit> entry = 
iterator.next();
                                MockSourceSplit split = entry.getValue();
+                               boolean hasRecords = false;
                                for (int i = 0; i < numRecordsPerSplitPerFetch 
&& !split.isFinished(); i++) {
                                        // This call may throw 
InterruptedException.
                                        int[] record = 
split.getNext(blockingFetch);
                                        if (record != null) {
                                                records.add(entry.getKey(), 
record);
+                                               hasRecords = true;
                                        }
                                }
                                if (split.isFinished()) {
-                                       
records.addFinishedSplit(entry.getKey());
+                                       if (!separatedFinishedRecord) {
+                                               
records.addFinishedSplit(entry.getKey());
+                                               iterator.remove();
+                                       } else if (!hasRecords) {
+                                               
records.addFinishedSplit(entry.getKey());
+                                               iterator.remove();
+                                               break;
+                                       }
                                }
                        }
                } catch (InterruptedException ie) {
@@ -123,4 +144,36 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
 
                return records.build();
        }
+
+       /**
+        * Builder for {@link MockSplitReader}.
+        */
+       public static class Builder {
+               private int numRecordsPerSplitPerFetch = 2;
+               private boolean separatedFinishedRecord = false;
+               private boolean blockingFetch = false;
+
+               public Builder setNumRecordsPerSplitPerFetch(int 
numRecordsPerSplitPerFetch) {
+                       this.numRecordsPerSplitPerFetch = 
numRecordsPerSplitPerFetch;
+                       return this;
+               }
+
+               public Builder setSeparatedFinishedRecord(boolean 
separatedFinishedRecord) {
+                       this.separatedFinishedRecord = separatedFinishedRecord;
+                       return this;
+               }
+
+               public Builder setBlockingFetch(boolean blockingFetch) {
+                       this.blockingFetch = blockingFetch;
+                       return this;
+               }
+
+               public MockSplitReader build() {
+                       return new MockSplitReader(numRecordsPerSplitPerFetch, 
blockingFetch, separatedFinishedRecord);
+               }
+       }
+
+       public static Builder newBuilder() {
+               return new Builder();
+       }
 }

Reply via email to