danny0405 commented on code in PR #18325:
URL: https://github.com/apache/hudi/pull/18325#discussion_r2937933025


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java:
##########
@@ -117,8 +115,11 @@ public void seek(long startingRecordOffset) {
 
   public static <T> BatchRecords<T> forRecords(
       String splitId, ClosableIterator<T> recordIterator, int fileOffset, long 
startingRecordOffset) {
-
-    return new BatchRecords<>(
-        splitId, recordIterator, fileOffset, startingRecordOffset, new 
HashSet<>());
+    // Pre-populate finishedSplits with splitId so that FetchTask calls 
splitFinishedCallback
+    // immediately after enqueueing the batch. This removes the split from
+    // SplitFetcher.assignedSplits, causing the fetcher to idle and invoke
+    // elementsQueue.notifyAvailable(), which is required to drive the 
END_OF_INPUT signal

Review Comment:
   > which is required to drive the END_OF_INPUT signal
       // in SourceReaderBase for bounded (batch) reads.
   
   The original logic seems more reasonabe? The `END_OF_INPUT` should be 
signaled after all the records in the split are handled(or iterate over)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to