This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69e9d3c2913fb1f923d71ab0f9d0eed43af11ff3
Author: sxnan <[email protected]>
AuthorDate: Thu Nov 12 13:55:07 2020 +0800

    [FLINK-19253][connector/common][test] Add test case to test when all split 
fetchers are closed with leftover element in queue
---
 .../base/source/reader/SourceReaderBaseTest.java   | 81 ++++++++++++++++++++--
 .../base/source/reader/mocks/MockSourceReader.java |  8 +++
 2 files changed, 85 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 646d88f..9f7742017 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -20,11 +20,13 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
 import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
 import 
org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
@@ -45,7 +47,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -82,7 +87,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                                public void wakeUp() {}
 
                                @Override
-                               public void close() throws Exception {}
+                               public void close() {}
                        },
                        getConfig(),
                        null)) {
@@ -192,6 +197,33 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                }
        }
 
+       @Test
+       public void 
pollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
+               throws Exception {
+
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
+                       new FutureCompletingBlockingQueue<>();
+               MockSplitReader mockSplitReader =
+                       new MockSplitReader(1, true);
+               BlockingShutdownSplitFetcherManager<int[], MockSourceSplit> 
splitFetcherManager =
+                       new 
BlockingShutdownSplitFetcherManager<>(elementsQueue, () -> mockSplitReader);
+               final MockSourceReader sourceReader = new MockSourceReader(
+                       elementsQueue,
+                       splitFetcherManager,
+                       getConfig(),
+                       null);
+
+               // Create and add a split that only contains one record
+               final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
+               sourceReader.addSplits(Collections.singletonList(split));
+               sourceReader.handleSourceEvents(new NoMoreSplitsEvent());
+
+               // Add the last record to the split when the 
splitFetcherManager shutting down SplitFetchers
+               
splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> 
split.addRecord(1));
+               assertEquals(InputStatus.MORE_AVAILABLE,
+                       sourceReader.pollNext(new TestingReaderOutput<>()));
+       }
+
        // ---------------- helper methods -----------------
 
        @Override
@@ -255,13 +287,13 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
                final SourceReader<E, TestingSourceSplit> reader = new 
SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, 
TestingSourceSplit>(
                        elementsQueue,
-                       () -> new TestingSplitReader<E, 
TestingSourceSplit>(records),
-                       new PassThroughRecordEmitter<E, TestingSourceSplit>(),
+                       () -> new TestingSplitReader<>(records),
+                       new PassThroughRecordEmitter<>(),
                        new Configuration(),
                        new TestingReaderContext()) {
 
                        @Override
-                       public void notifyCheckpointComplete(long checkpointId) 
throws Exception {}
+                       public void notifyCheckpointComplete(long checkpointId) 
{}
 
                        @Override
                        protected void onSplitFinished(Collection<String> 
finishedSplitIds) {}
@@ -286,4 +318,45 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
                return reader;
        }
+
+       // ------------------ Test helper classes -------------------
+       /**
+        * When maybeShutdownFinishedFetchers is invoke, 
BlockingShutdownSplitFetcherManager
+        * will complete the inShutdownSplitFetcherFuture and ensures that all 
the split fetchers
+        * are shutdown.
+        */
+       private static class BlockingShutdownSplitFetcherManager<E, SplitT 
extends SourceSplit>
+               extends SingleThreadFetcherManager<E, SplitT> {
+
+               private final CompletableFuture<Void> 
inShutdownSplitFetcherFuture;
+
+               public BlockingShutdownSplitFetcherManager(
+                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                       Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
+                       super(elementsQueue, splitReaderSupplier);
+                       this.inShutdownSplitFetcherFuture = new 
CompletableFuture<>();
+               }
+
+               @Override
+               public boolean maybeShutdownFinishedFetchers() {
+                       shutdownAllSplitFetcher();
+                       return true;
+               }
+
+               public CompletableFuture<Void> 
getInShutdownSplitFetcherFuture() {
+                       return inShutdownSplitFetcherFuture;
+               }
+
+               private void shutdownAllSplitFetcher() {
+                       inShutdownSplitFetcherFuture.complete(null);
+                       while (!super.maybeShutdownFinishedFetchers()) {
+                               try {
+                                       // avoid tight loop
+                                       Thread.sleep(1);
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 9de22aa..3793705 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
@@ -43,6 +44,13 @@ public class MockSourceReader
                super(elementsQueue, splitFetcherSupplier, new 
MockRecordEmitter(), config, context);
        }
 
+       public 
MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
+                                                       
SingleThreadFetcherManager<int[], MockSourceSplit> splitSplitFetcherManager,
+                                                       Configuration config,
+                                                       SourceReaderContext 
context) {
+               super(elementsQueue, splitSplitFetcherManager, new 
MockRecordEmitter(), config, context);
+       }
+
        @Override
        protected void onSplitFinished(Collection<String> finishedSplitIds) {
 

Reply via email to