HuangZhenQiu commented on code in PR #18325:
URL: https://github.com/apache/hudi/pull/18325#discussion_r2966981934
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java:
##########
@@ -117,8 +115,11 @@ public void seek(long startingRecordOffset) {
public static <T> BatchRecords<T> forRecords(
String splitId, ClosableIterator<T> recordIterator, int fileOffset, long
startingRecordOffset) {
-
- return new BatchRecords<>(
- splitId, recordIterator, fileOffset, startingRecordOffset, new
HashSet<>());
+ // Pre-populate finishedSplits with splitId so that FetchTask calls
splitFinishedCallback
+ // immediately after enqueueing the batch. This removes the split from
+ // SplitFetcher.assignedSplits, causing the fetcher to idle and invoke
+ // elementsQueue.notifyAvailable(), which is required to drive the
END_OF_INPUT signal
Review Comment:
Yes, @cshuo's summary is correct for the RecordsWithSplitIds handling. Due
the SplitFetcher implementation, we need to always add RecordsWithSplitIds
empty records and a finished split id for Audi's case in the end of finishing
split processing. It is the only way for split reader to notify underneath
framework the completion of a split.
But onSplitFinished is a callback for split reader to further clean up
resource.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L414
It is not defined for and can't be used for
splitFetcherManager.removeSplits directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]