HippoBaro commented on PR #9697:
URL: https://github.com/apache/arrow-rs/pull/9697#issuecomment-4266078901

   > My main concern with this PR is that it adds a specific IO buffer 
management policy for one usage pattern into ParquetPushDecoder
   
   Yeah, I agree with that. `RetentionSet` feels hacky and breaks with dynamic 
assignments.
   
   The current code lets the IO layer push more data than was requested and 
then filters out what is not useful, but as you pointed out, that creates a lot 
of issues, especially once there are multiple destinations, such as with 
dynamic work-stealing row group consumption.
   
   So instead, let’s assume the IO layer is responsible for owning prefetched 
data until it is actually requested. That does not solve the release problem so 
much as move it around.
   
   > A reliable way to know when the push decoder has consumed everything it 
will need from previously pushed data.
   
   I think that is partially solved if we use `try_next_reader`. Once you are 
done with the reader, you drop it, and the buffers go away, as you suggested. I 
like that. It does not help with speculatively fetched data still sitting in 
the IO layer, though. For that, we will still need some mechanism to tell the 
IO layer that certain ranges will never be consumed.
   
   Maybe we could add a new `DecodeResult::SkipData(Vec<Range<u64>>)` that 
tells the caller: “I will never `NeedsData` this range, so you may free any 
buffered data for it and cancel associated in-flight IO.” We would return that 
when the decoder decides to skip a row group, for example.
   
   If I put it all together, we get something like this:
   
   ```rust
   let decoder = make_decoder();
   
   // retrieve all row groups of interest with metadata
   let ranges = predicted_row_group(&metadata);
   io_layer.hint(ranges); // maybe start prefetching, maybe not
   
   for rg_idx in row_groups {
         match decoder.try_next_reader()? {
             DecodeResult::Data(reader) => { 
                 // All pushed buffers go away when the reader is dropped
                 consume(reader);
             },
             DecodeResult::NeedsData(ranges) => decoder.push_ranges(ranges, 
io_layer.fetch(&ranges))?,
             DecodeResult::SkipData(ranges) => io_layer.release(ranges),
             DecodeResult::Finished => break,
         }
     }
   ```
   
   This gives us fairly clean decoupling: the IO layer can do whatever it 
wants, but **can't push unsolicited buffers**, which seems like a reasonable 
constraint.
   
   WDYT @alamb?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to