PatrickRen commented on code in PR #25130:
URL: https://github.com/apache/flink/pull/25130#discussion_r1699813636


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##########
@@ -68,6 +71,78 @@ public void testCloseFetcherWithException() throws Exception 
{
                 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
     }
 
+    @Test
+    public void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
+        final String splitId = "testSplit";
+        // Set the queue capacity to 1 to make sure in this case the
+        // fetcher shutdown won't block on putting the batches into the queue.
+        Configuration config = new Configuration();
+        config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, config);
+        // Ensure the fetcher has emitted an element into the queue.
+        fetcherManager.getQueue().getAvailabilityFuture().get();
+        waitUntil(
+                () -> {
+                    fetcherManager.maybeShutdownFinishedFetchers();
+                    return fetcherManager.fetchers.isEmpty();
+                },
+                "The idle fetcher should have been removed.");
+        // Now close the fetcher manager. The fetcher manager closing should 
not block.
+        fetcherManager.close(30_000);
+    }
+
+    @Test
+    public void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws 
Exception {
+        final String splitId = "testSplit";
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, new Configuration());
+        try {
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue =
+                    fetcherManager.getQueue();
+            // Wait util the data batch is emitted.
+            queue.getAvailabilityFuture().get();
+            waitUntil(
+                    () -> {
+                        fetcherManager.maybeShutdownFinishedFetchers();
+                        return fetcherManager.getNumAliveFetchers() == 0;
+                    },
+                    Duration.ofSeconds(1),
+                    "The fetcher should have already been removed from the 
alive fetchers.");
+
+            // There should be two fetches available, one for data (although 
empty), one for the
+            // shutdown synchronization (also empty).
+            waitUntil(
+                    () -> queue.size() == 2,
+                    Duration.ofSeconds(1),
+                    "The element queue should have 2 batches when the fetcher 
is closed.");
+
+            // Finish the first batch (data batch).
+            queue.poll().recycle();
+            assertThat(reader.isClosed)
+                    .isFalse()
+                    .as("The reader should have not been closed.");

Review Comment:
   According to the JavaDoc of AssertJ, as() should be before isFalse():
   ```suggestion
               assertThat(reader.isClosed)
                       .as("The reader should have not been closed.")
                       .isFalse();
   ```



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java:
##########
@@ -275,6 +279,39 @@ public void testCloseAfterPause() throws 
InterruptedException {
         assertThat(fetcher.runOnce()).isFalse();
     }
 
+    @Test
+    public void testShutdownWaitingForRecordsProcessing() throws Exception {
+        TestingSplitReader<Object, TestingSourceSplit> splitReader = new 
TestingSplitReader<>();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+                new FutureCompletingBlockingQueue<>();
+        final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(splitReader, queue);
+        fetcher.shutdown(true);
+
+        // Spawn a new fetcher thread to go through the shutdown sequence.
+        CheckedThread fetcherThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        fetcher.run();
+                        assertThat(splitReader.isClosed()).isTrue();
+                    }
+                };
+        fetcherThread.start();
+
+        // Wait until the fetcher thread to block on the shutdown latch.
+        waitUntil(
+                () -> fetcherThread.getState() == WAITING,
+                Duration.ofSeconds(1),
+                "The fetcher thread should be waiting for the shutdown latch");
+        assertThat(splitReader.isClosed())
+                .isFalse()
+                .as("The split reader should have not been closed.");

Review Comment:
   ```suggestion
           assertThat(splitReader.isClosed())
                   .as("The split reader should have not been closed.")
                   .isFalse();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to