becketqin commented on code in PR #25130:
URL: https://github.com/apache/flink/pull/25130#discussion_r1698836747


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -82,6 +85,14 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
 
     private final Consumer<Collection<String>> splitFinishedHook;
 
+    /**
+     * A shutdown latch to help make sure the SplitReader is only closed after 
all the emitted
+     * records have been processed by the main reader thread. This is needed 
because in some cases,
+     * the records in the <tt>RecordsWithSplitIds</tt> may have not been 
processed when the split
+     * fetcher shuts down.
+     */
+    private final CountDownLatch recordsProcessedLatch;

Review Comment:
   That is not a bad idea. We can do a refactor and introduce a state machine 
to make the code more readable. That said, the state of waiting for the records 
to be processed seems just a step in the shutdown sequence, and may not 
necessarily need a state to distinguish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to