becketqin commented on code in PR #25130:
URL: https://github.com/apache/flink/pull/25130#discussion_r1698836747
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -82,6 +85,14 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
private final Consumer<Collection<String>> splitFinishedHook;
+ /**
+ * A shutdown latch to help make sure the SplitReader is only closed after
all the emitted
+ * records have been processed by the main reader thread. This is needed
because in some cases,
+ * the records in the <tt>RecordsWithSplitIds</tt> may have not been
processed when the split
+ * fetcher shuts down.
+ */
+ private final CountDownLatch recordsProcessedLatch;
Review Comment:
That is not a bad idea. We can do a refactor and introduce a state machine
to make the code more readable. That said, the state of waiting for the records
to be processed seems just a step in the shutdown sequence, and may not
necessarily need a state to distinguish.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]