venkata91 commented on code in PR #25130:
URL: https://github.com/apache/flink/pull/25130#discussion_r1705956844


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java:
##########
@@ -315,14 +326,26 @@ public 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue() {
      * @throws Exception when failed to close the split fetcher manager.
      */
     public synchronized void close(long timeoutMs) throws Exception {
+        final long startTime = System.currentTimeMillis();
         closed = true;
         fetchers.values().forEach(SplitFetcher::shutdown);
+        // Actively drain the element queue in case there are previously 
shutting down
+        // fetcher threads blocking on putting batches into the element queue.
+        executors.submit(
+                () -> {
+                    while (fetchersToShutDown.get() > 0
+                            && System.currentTimeMillis() - startTime < 
timeoutMs) {

Review Comment:
   Sorry for commenting after the patch is merged.
   
   Better to use `System.nanoTime()` instead of `System.currentTimeMillis()` - 
Due to clock synchronization, it is possible that `System.currentTimeMillis() - 
startTime` be negative. Don't think this would cause any correctness or 
unexpected behavior issues in this case though. Just wanted to point that out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to