HippoBaro commented on PR #9697:
URL: https://github.com/apache/arrow-rs/pull/9697#issuecomment-4266078901
> My main concern with this PR is that it adds a specific IO buffer
management policy for one usage pattern into ParquetPushDecoder
Yeah, I agree with that. `RetentionSet` feels hacky and breaks with dynamic
assignments.
The current code lets the IO layer push more data than was requested and
then filters out what is not useful, but as you pointed out, that creates a lot
of issues, especially once there are multiple destinations, such as with
dynamic work-stealing row group consumption.
So instead, let’s assume the IO layer is responsible for owning prefetched
data until it is actually requested. That does not solve the release problem so
much as move it around.
> A reliable way to know when the push decoder has consumed everything it
will need from previously pushed data.
I think that is partially solved if we use `try_next_reader`. Once you are
done with the reader, you drop it, and the buffers go away, as you suggested. I
like that. It does not help with speculatively fetched data still sitting in
the IO layer, though. For that, we will still need some mechanism to tell the
IO layer that certain ranges will never be consumed.
Maybe we could add a new `DecodeResult::SkipData(Vec<Range<u64>>)` that
tells the caller: “I will never `NeedsData` this range, so you may free any
buffered data for it and cancel associated in-flight IO.” We would return that
when the decoder decides to skip a row group, for example.
If I put it all together, we get something like this:
```rust
let decoder = make_decoder();
// retrieve all row groups of interest with metadata
let ranges = predicted_row_group(&metadata);
io_layer.hint(ranges); // maybe start prefetching, maybe not
for rg_idx in row_groups {
match decoder.try_next_reader()? {
DecodeResult::Data(reader) => {
// All pushed buffers go away when the reader is dropped
consume(reader);
},
DecodeResult::NeedsData(ranges) => decoder.push_ranges(ranges,
io_layer.fetch(&ranges))?,
DecodeResult::SkipData(ranges) => io_layer.release(ranges),
DecodeResult::Finished => break,
}
}
```
This gives us fairly clean decoupling: the IO layer can do whatever it
wants, but **can't push unsolicited buffers**, which seems like a reasonable
constraint.
WDYT @alamb?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]