lidavidm commented on code in PR #14226:
URL: https://github.com/apache/arrow/pull/14226#discussion_r999334979
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1107,11 +1110,23 @@ class RowGroupGenerator {
if (!reader->properties().pre_buffer()) {
row_group_read = SubmitRead(cpu_executor_, reader, row_group,
column_indices);
} else {
- auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
- if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
+ auto ready = ::arrow::Future<>::MakeFinished();
row_group_read = ready.Then([=]() ->
::arrow::Future<RecordBatchGenerator> {
- return ReadOneRowGroup(cpu_executor_, reader, row_group,
column_indices);
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ reader->parquet_reader()->PreBuffer({row_group}, column_indices_,
+ reader_properties_.io_context(),
+
reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ auto wait_buffer =
+ reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
+ wait_buffer.Wait();
Review Comment:
Hmm…I think anything calling Wait() in a callback/async context is not going
to be right.
I think the issue is that the pre-buffer code doesn't handle concurrent use.
The Wait() is effectively just working around that by blocking the thread so
that there's no sharing. However, if you attach a reentrant readahead generator
to it, I'd guess it'd still fail. So I think either the internals should be
refactored so that it does handle concurrent use, or we should just create a
separate ReadRangeCache per row group. (The advantage of that is that you'd
have a harder bound on memory usage.)
However either way this loses 'nice' properties of the original,
buffer-entire-file approach (e.g. small row groups can get combined together
for I/O). IMO, the longer term solution would be to disentangle the 'cache' and
'coalesce' behaviors (and possibly even remove the 'cache' behavior, which may
make more sense as a wrapper over RandomAccessFile?) and try the approach
proposed in the original JIRA, which would be to coalesce ranges, then track
when ranges are actually read and remove the buffer from the coalescer once all
ranges mapping to a given buffer are read. (The buffer may be kept alive
downstream due to shared usage, though.) Or maybe that's still overly fancy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]