This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fe2b269b8bcab8a2344841c09d643cf2ebdcb5be Author: Kezhu Wang <[email protected]> AuthorDate: Mon Nov 9 01:59:06 2020 +0800 [FLINK-19448][connector/common] Fix handling of finished splits and closing split fetchers in SourceReaderBase This closes #13989 --- .../base/source/reader/SourceReaderBase.java | 4 +- .../source/reader/fetcher/SplitFetcherManager.java | 8 ++- .../base/source/reader/SourceReaderBaseTest.java | 69 ++++++++++++++++++++++ .../base/source/reader/mocks/MockSplitReader.java | 57 +++++++++++++++++- 4 files changed, 134 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 5b4d6f7..503e6c9 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -270,7 +270,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt splitFetcherManager.checkErrors(); return InputStatus.END_OF_INPUT; } else { - throw new IllegalStateException("Called 'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue"); + // We can reach this case if we just processed all data from the queue and finished a split, + // and concurrently the fetcher finished another split, whose data is then in the queue. + return InputStatus.MORE_AVAILABLE; } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 2b5b331..0d4ddfb 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -133,7 +133,13 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { elementsQueue, splitReader, errorHandler, - () -> fetchers.remove(fetcherId)); + () -> { + fetchers.remove(fetcherId); + // We need this to synchronize status of fetchers to concurrent partners as + // ConcurrentHashMap's aggregate status methods including size, isEmpty, and + // containsValue are not designed for program control. + elementsQueue.notifyAvailable(); + }); fetchers.put(fetcherId, splitFetcher); return splitFetcher; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index dc8342f..646d88f 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -123,6 +124,74 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> assertTrue(records.isRecycled()); } + @Test + public void testMultipleSplitsWithDifferentFinishingMoments() throws Exception { + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + MockSplitReader mockSplitReader = MockSplitReader.newBuilder() + .setNumRecordsPerSplitPerFetch(2) + .setSeparatedFinishedRecord(false) + .setBlockingFetch(false) + .build(); + MockSourceReader reader = new MockSourceReader( + elementsQueue, + () -> mockSplitReader, + getConfig(), + null); + + reader.start(); + + List<MockSourceSplit> splits = Arrays.asList( + getSplit(0, 10, Boundedness.BOUNDED), + getSplit(1, 12, Boundedness.BOUNDED) + ); + reader.addSplits(splits); + reader.handleSourceEvents(new NoMoreSplitsEvent()); + + while (true) { + InputStatus status = reader.pollNext(new TestingReaderOutput<>()); + if (status == InputStatus.END_OF_INPUT) { + break; + } if (status == InputStatus.NOTHING_AVAILABLE) { + reader.isAvailable().get(); + } + } + } + + @Test + public void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception { + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + MockSplitReader mockSplitReader = MockSplitReader.newBuilder() + .setNumRecordsPerSplitPerFetch(2) + .setSeparatedFinishedRecord(true) + .setBlockingFetch(false) + .build(); + MockSourceReader reader = new MockSourceReader( + elementsQueue, + () -> mockSplitReader, + getConfig(), + null); + + reader.start(); + + List<MockSourceSplit> splits = Arrays.asList( + getSplit(0, 10, Boundedness.BOUNDED), + getSplit(1, 10, Boundedness.BOUNDED) + ); + reader.addSplits(splits); + reader.handleSourceEvents(new NoMoreSplitsEvent()); + + while (true) { + InputStatus status = reader.pollNext(new TestingReaderOutput<>()); + if (status == InputStatus.END_OF_INPUT) { + break; + } if (status == InputStatus.NOTHING_AVAILABLE) { + reader.isAvailable().get(); + } + } + } + // ---------------- helper methods ----------------- @Override diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index e991b9c..f4d0a2f 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -40,6 +41,7 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { // Use LinkedHashMap for determinism. private final Map<String, MockSourceSplit> splits = new LinkedHashMap<>(); private final int numRecordsPerSplitPerFetch; + private final boolean separatedFinishedRecord; private final boolean blockingFetch; private final Object wakeupLock = new Object(); @@ -49,7 +51,15 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { public MockSplitReader( int numRecordsPerSplitPerFetch, boolean blockingFetch) { + this(numRecordsPerSplitPerFetch, false, blockingFetch); + } + + private MockSplitReader( + int numRecordsPerSplitPerFetch, + boolean separatedFinishedRecord, + boolean blockingFetch) { this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch; + this.separatedFinishedRecord = separatedFinishedRecord; this.blockingFetch = blockingFetch; } @@ -93,17 +103,28 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { } try { - for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) { + Iterator<Map.Entry<String, MockSourceSplit>> iterator = splits.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, MockSourceSplit> entry = iterator.next(); MockSourceSplit split = entry.getValue(); + boolean hasRecords = false; for (int i = 0; i < numRecordsPerSplitPerFetch && !split.isFinished(); i++) { // This call may throw InterruptedException. int[] record = split.getNext(blockingFetch); if (record != null) { records.add(entry.getKey(), record); + hasRecords = true; } } if (split.isFinished()) { - records.addFinishedSplit(entry.getKey()); + if (!separatedFinishedRecord) { + records.addFinishedSplit(entry.getKey()); + iterator.remove(); + } else if (!hasRecords) { + records.addFinishedSplit(entry.getKey()); + iterator.remove(); + break; + } } } } catch (InterruptedException ie) { @@ -123,4 +144,36 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { return records.build(); } + + /** + * Builder for {@link MockSplitReader}. + */ + public static class Builder { + private int numRecordsPerSplitPerFetch = 2; + private boolean separatedFinishedRecord = false; + private boolean blockingFetch = false; + + public Builder setNumRecordsPerSplitPerFetch(int numRecordsPerSplitPerFetch) { + this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch; + return this; + } + + public Builder setSeparatedFinishedRecord(boolean separatedFinishedRecord) { + this.separatedFinishedRecord = separatedFinishedRecord; + return this; + } + + public Builder setBlockingFetch(boolean blockingFetch) { + this.blockingFetch = blockingFetch; + return this; + } + + public MockSplitReader build() { + return new MockSplitReader(numRecordsPerSplitPerFetch, blockingFetch, separatedFinishedRecord); + } + } + + public static Builder newBuilder() { + return new Builder(); + } }
