becketqin commented on a change in pull request #14011:
URL: https://github.com/apache/flink/pull/14011#discussion_r521997526



##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
##########
@@ -286,4 +291,71 @@ protected TestingSourceSplit toSplitType(String splitId, 
TestingSourceSplit spli
 
                return reader;
        }
+
+       @Test
+       public void 
pollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()

Review comment:
       Usually the test method names starts with `testXXX`. I'll fix this upon 
checking in the patch.

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
##########
@@ -286,4 +291,71 @@ protected TestingSourceSplit toSplitType(String splitId, 
TestingSourceSplit spli
 
                return reader;
        }
+
+       @Test
+       public void 
pollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
+               throws Exception {
+
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
+                       new FutureCompletingBlockingQueue<>();
+               MockSplitReader mockSplitReader =
+                       new MockSplitReader(1, true);
+               BlockingShutdownSplitFetcherManager<int[], MockSourceSplit> 
splitFetcherManager =
+                       new 
BlockingShutdownSplitFetcherManager<>(elementsQueue, () -> mockSplitReader);
+               final MockSourceReader sourceReader = new MockSourceReader(
+                       elementsQueue,
+                       splitFetcherManager,
+                       getConfig(),
+                       null);
+
+               // Create and add a split that only contains one record
+               final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
+               sourceReader.addSplits(Collections.singletonList(split));
+               sourceReader.notifyNoMoreSplits();
+
+               // Add the last record to the split when the 
splitFetcherManager shutting down SplitFetchers
+               
splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> 
split.addRecord(1));
+               assertEquals(InputStatus.MORE_AVAILABLE,
+                       sourceReader.pollNext(new TestingReaderOutput<>()));
+       }
+
+       /**
+        * When maybeShutdownFinishedFetchers is invoke, 
BlockingShutdownSplitFetcherManager
+        * will complete the inShutdownSplitFetcherFuture and ensures that all 
the split fetchers
+        * are shutdown.
+        */
+       static class BlockingShutdownSplitFetcherManager<E, SplitT extends 
SourceSplit>

Review comment:
       Should this class be private?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to