PatrickRen commented on code in PR #25130:
URL: https://github.com/apache/flink/pull/25130#discussion_r1698253138


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##########
@@ -68,6 +71,76 @@ public void testCloseFetcherWithException() throws Exception 
{
                 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
     }
 
+    @Test
+    public void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
+        final String splitId = "testSplit";
+        // Set the queue capacity to 1 to make sure in this case the
+        // fetcher shutdown won't block on putting the batches into the queue.
+        Configuration config = new Configuration();
+        config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, config);
+        // Ensure the fetcher has emitted an element into the queue.
+        fetcherManager.getQueue().getAvailabilityFuture().get();
+        waitUntil(
+                () -> {
+                    fetcherManager.maybeShutdownFinishedFetchers();
+                    return fetcherManager.fetchers.isEmpty();
+                },
+                "The idle fetcher should have been removed.");
+        // Now close the fetcher manager. The fetcher manager closing should 
not block.
+        fetcherManager.close(30_000);
+    }
+
+    @Test
+    public void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws 
Exception {
+        final String splitId = "testSplit";
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, new Configuration());
+        final SplitFetcher<?, ?> fetcher = fetcherManager.fetchers.get(0);

Review Comment:
   This fetcher is not used.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java:
##########
@@ -315,8 +326,20 @@ public 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue() {
      * @throws Exception when failed to close the split fetcher manager.
      */
     public synchronized void close(long timeoutMs) throws Exception {
+        final long startTime = System.currentTimeMillis();
         closed = true;
         fetchers.values().forEach(SplitFetcher::shutdown);
+        // Actively drain the element queue in case there are previously 
shutting down
+        // fetcher threads blocking on putting batches into the element queue.
+        executors.submit(
+                () -> {
+                    while (fetchersToShutDown.get() > 0
+                            && System.currentTimeMillis() - startTime < 
timeoutMs) {
+                        elementsQueue
+                                .getAvailabilityFuture()
+                                .thenRun(() -> elementsQueue.poll().recycle());
+                    }
+                });
         executors.shutdown();
         if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
             LOG.warn(

Review Comment:
   (I can't add my comment on the exact line due to the limit of the GitHub 
webpage) I think the warning log here should print `fetchersToShutDown` instead 
of `fetchers.size()`.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -82,6 +85,14 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
 
     private final Consumer<Collection<String>> splitFinishedHook;
 
+    /**
+     * A shutdown latch to help make sure the SplitReader is only closed after 
all the emitted
+     * records have been processed by the main reader thread. This is needed 
because in some cases,
+     * the records in the <tt>RecordsWithSplitIds</tt> may have not been 
processed when the split
+     * fetcher shuts down.
+     */
+    private final CountDownLatch recordsProcessedLatch;

Review Comment:
   nit: I think this patch introduces one more state for SplitFetcher: closed 
but not fully shutdown. It looks like SplitFetcher now has a lot of states, 
like running, paused, closed and shutdown. Maybe we can clean up the code of 
SplitFetcher in the future to make it more maintainable. 



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java:
##########
@@ -68,6 +71,76 @@ public void testCloseFetcherWithException() throws Exception 
{
                 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
     }
 
+    @Test
+    public void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
+        final String splitId = "testSplit";
+        // Set the queue capacity to 1 to make sure in this case the
+        // fetcher shutdown won't block on putting the batches into the queue.
+        Configuration config = new Configuration();
+        config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, config);
+        // Ensure the fetcher has emitted an element into the queue.
+        fetcherManager.getQueue().getAvailabilityFuture().get();
+        waitUntil(
+                () -> {
+                    fetcherManager.maybeShutdownFinishedFetchers();
+                    return fetcherManager.fetchers.isEmpty();
+                },
+                "The idle fetcher should have been removed.");
+        // Now close the fetcher manager. The fetcher manager closing should 
not block.
+        fetcherManager.close(30_000);
+    }
+
+    @Test
+    public void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws 
Exception {
+        final String splitId = "testSplit";
+        final AwaitingReader<Integer, TestingSourceSplit> reader =
+                new AwaitingReader<>(
+                        new IOException("Should not happen"),
+                        new RecordsBySplits<>(
+                                Collections.emptyMap(), 
Collections.singleton(splitId)));
+        final SplitFetcherManager<Integer, TestingSourceSplit> fetcherManager =
+                createFetcher(splitId, reader, new Configuration());
+        final SplitFetcher<?, ?> fetcher = fetcherManager.fetchers.get(0);
+        try {
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> queue =
+                    fetcherManager.getQueue();
+            // Wait util the data batch is emitted.
+            queue.getAvailabilityFuture().get();
+            waitUntil(
+                    () -> {
+                        fetcherManager.maybeShutdownFinishedFetchers();
+                        return fetcherManager.getNumAliveFetchers() == 0;
+                    },
+                    Duration.ofSeconds(1),
+                    "The fetcher should have already been removed from the 
alive fetchers.");
+
+            // There should be two fetches available, one for data (although 
empty), one for the
+            // shutdown synchronization (also empty).
+            waitUntil(
+                    () -> queue.size() == 2,
+                    Duration.ofSeconds(1),
+                    "The element queue should have 2 batches when the fetcher 
is closed.");
+
+            // Finish the first batch (data batch).
+            queue.poll().recycle();
+            // Finish the second batch (synchronization batch).
+            queue.poll().recycle();

Review Comment:
   What about adding an assertion between these two lines, that the reader is 
not closed before the synchronization batch is recycled?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java:
##########
@@ -275,6 +279,37 @@ public void testCloseAfterPause() throws 
InterruptedException {
         assertThat(fetcher.runOnce()).isFalse();
     }
 
+    @Test
+    public void testShutdownWaitingForRecordsProcessing() throws Exception {
+        TestingSplitReader<Object, TestingSourceSplit> splitReader = new 
TestingSplitReader<>();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+                new FutureCompletingBlockingQueue<>();
+        final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(splitReader, queue);
+        fetcher.shutdown(true);
+
+        // Spawn a new thread af fetcher thread to go through the shutdown 
sequence.
+        CheckedThread fetcherThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        fetcher.run();
+                        assertThat(splitReader.isClosed()).isTrue();
+                    }
+                };
+        fetcherThread.start();
+
+        // Wait until the fetcher thread to block on the shutdown latch.
+        waitUntil(
+                () -> fetcherThread.getState() == WAITING,
+                Duration.ofSeconds(1),
+                "The fetcher thread should be waiting for the shutdown latch");
+        assertFalse("The split reader should have not been closed.", 
splitReader.isClosed());

Review Comment:
   The latest code style requires AssertJ for tests:
   
   ```suggestion
           assertThat(splitReader.isClosed()).as("The split reader should have 
not been closed.").isFalse();
   ```



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java:
##########
@@ -275,6 +279,37 @@ public void testCloseAfterPause() throws 
InterruptedException {
         assertThat(fetcher.runOnce()).isFalse();
     }
 
+    @Test
+    public void testShutdownWaitingForRecordsProcessing() throws Exception {
+        TestingSplitReader<Object, TestingSourceSplit> splitReader = new 
TestingSplitReader<>();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+                new FutureCompletingBlockingQueue<>();
+        final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(splitReader, queue);
+        fetcher.shutdown(true);
+
+        // Spawn a new thread af fetcher thread to go through the shutdown 
sequence.

Review Comment:
   `af` -> `as`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to