This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69e9d3c2913fb1f923d71ab0f9d0eed43af11ff3 Author: sxnan <[email protected]> AuthorDate: Thu Nov 12 13:55:07 2020 +0800 [FLINK-19253][connector/common][test] Add test case to test when all split fetchers are closed with leftover element in queue --- .../base/source/reader/SourceReaderBaseTest.java | 81 ++++++++++++++++++++-- .../base/source/reader/mocks/MockSourceReader.java | 8 +++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 646d88f..9f7742017 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -20,11 +20,13 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.TestingReaderContext; import org.apache.flink.api.connector.source.mocks.TestingReaderOutput; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader; import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader; import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter; @@ -45,7 +47,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -82,7 +87,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> public void wakeUp() {} @Override - public void close() throws Exception {} + public void close() {} }, getConfig(), null)) { @@ -192,6 +197,33 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } } + @Test + public void pollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue() + throws Exception { + + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + MockSplitReader mockSplitReader = + new MockSplitReader(1, true); + BlockingShutdownSplitFetcherManager<int[], MockSourceSplit> splitFetcherManager = + new BlockingShutdownSplitFetcherManager<>(elementsQueue, () -> mockSplitReader); + final MockSourceReader sourceReader = new MockSourceReader( + elementsQueue, + splitFetcherManager, + getConfig(), + null); + + // Create and add a split that only contains one record + final MockSourceSplit split = new MockSourceSplit(0, 0, 1); + sourceReader.addSplits(Collections.singletonList(split)); + sourceReader.handleSourceEvents(new NoMoreSplitsEvent()); + + // Add the last record to the split when the splitFetcherManager shutting down SplitFetchers + splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> split.addRecord(1)); + assertEquals(InputStatus.MORE_AVAILABLE, + sourceReader.pollNext(new TestingReaderOutput<>())); + } + // ---------------- helper methods ----------------- @Override @@ -255,13 +287,13 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>( elementsQueue, - () -> new TestingSplitReader<E, TestingSourceSplit>(records), - new PassThroughRecordEmitter<E, TestingSourceSplit>(), + () -> new TestingSplitReader<>(records), + new PassThroughRecordEmitter<>(), new Configuration(), new TestingReaderContext()) { @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception {} + public void notifyCheckpointComplete(long checkpointId) {} @Override protected void onSplitFinished(Collection<String> finishedSplitIds) {} @@ -286,4 +318,45 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> return reader; } + + // ------------------ Test helper classes ------------------- + /** + * When maybeShutdownFinishedFetchers is invoke, BlockingShutdownSplitFetcherManager + * will complete the inShutdownSplitFetcherFuture and ensures that all the split fetchers + * are shutdown. + */ + private static class BlockingShutdownSplitFetcherManager<E, SplitT extends SourceSplit> + extends SingleThreadFetcherManager<E, SplitT> { + + private final CompletableFuture<Void> inShutdownSplitFetcherFuture; + + public BlockingShutdownSplitFetcherManager( + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + this.inShutdownSplitFetcherFuture = new CompletableFuture<>(); + } + + @Override + public boolean maybeShutdownFinishedFetchers() { + shutdownAllSplitFetcher(); + return true; + } + + public CompletableFuture<Void> getInShutdownSplitFetcherFuture() { + return inShutdownSplitFetcherFuture; + } + + private void shutdownAllSplitFetcher() { + inShutdownSplitFetcherFuture.complete(null); + while (!super.maybeShutdownFinishedFetchers()) { + try { + // avoid tight loop + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java index 9de22aa..3793705 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; @@ -43,6 +44,13 @@ public class MockSourceReader super(elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context); } + public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue, + SingleThreadFetcherManager<int[], MockSourceSplit> splitSplitFetcherManager, + Configuration config, + SourceReaderContext context) { + super(elementsQueue, splitSplitFetcherManager, new MockRecordEmitter(), config, context); + } + @Override protected void onSplitFinished(Collection<String> finishedSplitIds) {
