cshuo commented on code in PR #18325:
URL: https://github.com/apache/hudi/pull/18325#discussion_r2939177733
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java:
##########
@@ -117,8 +115,11 @@ public void seek(long startingRecordOffset) {
public static <T> BatchRecords<T> forRecords(
String splitId, ClosableIterator<T> recordIterator, int fileOffset, long
startingRecordOffset) {
-
- return new BatchRecords<>(
- splitId, recordIterator, fileOffset, startingRecordOffset, new
HashSet<>());
+ // Pre-populate finishedSplits with splitId so that FetchTask calls
splitFinishedCallback
+ // immediately after enqueueing the batch. This removes the split from
+ // SplitFetcher.assignedSplits, causing the fetcher to idle and invoke
+ // elementsQueue.notifyAvailable(), which is required to drive the
END_OF_INPUT signal
Review Comment:
The cause of the bug is that currently the SplitFetcher only deletes splits
when the fetch result of a split reader is a `RecordsWithSplitIds` with empty
records and a set of finished splits (to be deleted).
Actually, there is a callback `onSplitFinished` in `HoodieSourceReader`,
where we can delete the finished splits explicitly.
```
@Override
protected void onSplitFinished(Map<String, HoodieSourceSplit>
finishedSplitIds) {
requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
// delete the finished splits.
splitFetcherManager.removeSplits(new
ArrayList<>(finishedSplitIds.values()));
}
```
Note: there are some other related changes:
* Bug: `HoodieSourceSplit#splitId()` is not idempotent. The id should not
contain mutable fields `consumed` and `fileOffset`.
* Handle `SplitsRemoval` event in
`HoodieSourceSplitReader#handleSplitsChanges`, maybe just a log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]