lidavidm commented on code in PR #12967:
URL: https://github.com/apache/arrow/pull/12967#discussion_r860134358
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1051,33 +1051,71 @@ class RowGroupGenerator {
using RecordBatchGenerator =
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+ struct ReadRequest {
+ ::arrow::Future<RecordBatchGenerator> read;
+ int64_t num_rows;
+ };
+
explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
::arrow::internal::Executor* cpu_executor,
- std::vector<int> row_groups, std::vector<int>
column_indices)
+ std::vector<int> row_groups, std::vector<int>
column_indices,
+ int64_t min_rows_in_flight)
: arrow_reader_(std::move(arrow_reader)),
cpu_executor_(cpu_executor),
row_groups_(std::move(row_groups)),
column_indices_(std::move(column_indices)),
- index_(0) {}
+ min_rows_in_flight_(min_rows_in_flight),
+ rows_in_flight_(0),
+ index_(0),
+ readahead_index_(0) {}
::arrow::Future<RecordBatchGenerator> operator()() {
if (index_ >= row_groups_.size()) {
return ::arrow::AsyncGeneratorEnd<RecordBatchGenerator>();
}
- int row_group = row_groups_[index_++];
+ index_++;
+ FillReadahead();
+ ReadRequest next = std::move(in_flight_reads_.front());
+ DCHECK(!in_flight_reads_.empty());
+ in_flight_reads_.pop();
+ rows_in_flight_ -= next.num_rows;
+ return next.read;
+ }
+
+ private:
+ void FillReadahead() {
+ if (min_rows_in_flight_ == 0) {
+ // No readahead, fetch the batch when it is asked for
+ FetchNext();
+ } else {
+ while (readahead_index_ < row_groups_.size() &&
+ rows_in_flight_ < min_rows_in_flight_) {
+ FetchNext();
+ }
+ }
+ }
+
+ void FetchNext() {
+ int row_group_index = readahead_index_++;
Review Comment:
Seems MSVC is still unhappy here:
```
C:/projects/arrow/cpp/src/parquet/arrow/reader.cc(1099): error C2220:
warning treated as error - no 'object' file generated
C:/projects/arrow/cpp/src/parquet/arrow/reader.cc(1099): warning C4267:
'initializing': conversion from 'size_t' to 'int', possible loss of data
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]