leekeiabstraction commented on issue #2041:
URL: https://github.com/apache/fluss/issues/2041#issuecomment-3592642088

   @wuchong Would appreciate some input or suggestions from you on the 
following that may impact [LogScanner.poll(int 
timeout)](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java#L130)
 and whether we expose lazily initialised ScanRecords to direct users of 
LogScanner.
   
   Currently [`ScanRecords.records(TableBucket 
scanBucket)`](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java#L53)
 returns `List<ScanRecord>`. To realise the benefits of lazy initialisation, 
I'd need to make the method return an `Iterator<ScanRecord>` instead. I can add 
`ScanRecords.close()` which user can call (automatic closing when iterator 
reaches end will also be added). 
   
   But question remains on the behaviour that I should be aiming for in 
`LogScanner.poll()`. Options are:
   
   1. If previously returned ScanRecords are not fully consumed, `poll()` will 
always return `ScanRecords.EMPTY`. This is straightforward to implement but 
introduces issue where parallelisation of record consumption from multiple 
TableBucket is diminished.
   2. If previously returned ScanRecords are not fully consumed, `poll` will 
only return `ScanRecords` which do not have records from similar TableBucket. 
Better parallelisation than Option 1 as consumption of records from other 
TableBucket is not blocked.
   
   Of these two, I am inclined towards option 2 as I imagine that current 
[FlinkSourceSplitReader](https://github.com/apache/fluss/blob/a339158220a13b6df236b410a70b1ee6dc5fce23/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java#L410)
 and 
[TieringSourceReader](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java#L251)
 should work fine with it. Question arises however for users who use LogScanner 
directly. Unless they finish consuming (or call `close()`) on ScanRecords, they 
will end up with LogScanner returning `ScanRecords.EMPTY`. This deviates from 
existing behaviour where user can ignore previous results entirely and still 
get latest results using poll().
   
   Finally there's a third option, which is to only expose and use iterator 
with lazy initialisation internally for components like 
`FlinkSourceSplitReader` or `TieringSplitReader`. Out of these three options, I 
prefer option 3 the most to protect LogScanner user from resource leak or 
deadlock when they do not close() or fully consume ScanRecords.
   
   I suggest opt 3 where we do not expose lazy initialised iterators and only 
use lazy initialisation within internal components.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to