leekeiabstraction commented on issue #2041: URL: https://github.com/apache/fluss/issues/2041#issuecomment-3592642088
@wuchong Would appreciate some input or suggestions from you on the following that may impact [LogScanner.poll(int timeout)](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java#L130) and whether we expose lazily initialised ScanRecords to direct users of LogScanner. Currently [`ScanRecords.records(TableBucket scanBucket)`](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java#L53) returns `List<ScanRecord>`. To realise the benefits of lazy initialisation, I'd need to make the method return an `Iterator<ScanRecord>` instead. I can add `ScanRecords.close()` which user can call (automatic closing when iterator reaches end will also be added). But question remains on the behaviour that I should be aiming for in `LogScanner.poll()`. Options are: 1. If previously returned ScanRecords are not fully consumed, `poll()` will always return `ScanRecords.EMPTY`. This is straightforward to implement but introduces issue where parallelisation of record consumption from multiple TableBucket is diminished. 2. If previously returned ScanRecords are not fully consumed, `poll` will only return `ScanRecords` which do not have records from similar TableBucket. Better parallelisation than Option 1 as consumption of records from other TableBucket is not blocked. Of these two, I am inclined towards option 2 as I imagine that current [FlinkSourceSplitReader](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java#L410) and [TieringSourceReader](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java#L251) should work fine with it. Question arises however for users who use LogScanner directly. Unless they finish consuming (or call `close()`) on ScanRecords, they will end up with LogScanner returning `ScanRecords.EMPTY`. This deviates from existing behaviour where user can ignore previous results entirely and still get latest results using poll(). Finally there's a third option, which is to only expose and use iterator with lazy initialisation internally for components like `FlinkSourceSplitReader` or `TieringSplitReader`. Out of these three options, I prefer option 3 the most to protect LogScanner user from resource leak or deadlock when they do not close() or fully consume ScanRecords. I suggest opt 3 where we do not expose lazy initialised iterators and only use lazy initialisation within internal components. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
