lidavidm commented on a change in pull request #9620: URL: https://github.com/apache/arrow/pull/9620#discussion_r591890348
########## File path: cpp/src/parquet/arrow/reader.cc ########## @@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, return Status::OK(); } +/// Given a file reader and a list of row groups, this is a generator of record +/// batch vectors (where each vector is the contents of a single row group). +class RowGroupGenerator { + public: + using Item = ::arrow::util::optional<::arrow::RecordBatchVector>; + + explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups, + std::vector<int> column_indices) + : self_(self), + index_(0), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)) {} + + ::arrow::Future<Item> operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt); + } + int row_group = row_groups_[index_++]; + FileReaderImpl* self = self_; + std::vector<int> column_indices = column_indices_; + ARROW_ASSIGN_OR_RAISE(auto fut, + ::arrow::internal::GetCpuThreadPool()->Submit( Review comment: Ah, ok - I was envisioning that the caller would explicitly buffer beforehand (since we had been talking about splitting up ScanTask in that way) but we can have this manage buffering internally as well. (Either way, either the IPC reader or the Parquet reader will need some refactoring to meet Datasets' needs.) ########## File path: cpp/src/parquet/arrow/reader.cc ########## @@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, return Status::OK(); } +/// Given a file reader and a list of row groups, this is a generator of record +/// batch vectors (where each vector is the contents of a single row group). +class RowGroupGenerator { + public: + using Item = ::arrow::util::optional<::arrow::RecordBatchVector>; + + explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups, + std::vector<int> column_indices) + : self_(self), + index_(0), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)) {} + + ::arrow::Future<Item> operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt); + } + int row_group = row_groups_[index_++]; + FileReaderImpl* self = self_; + std::vector<int> column_indices = column_indices_; + ARROW_ASSIGN_OR_RAISE(auto fut, + ::arrow::internal::GetCpuThreadPool()->Submit( + &ReadOneRowGroup, self, row_group, column_indices)); + return fut; + } + + private: + static ::arrow::Result<Item> ReadOneRowGroup(FileReaderImpl* self, const int row_group, + const std::vector<int>& column_indices) { + std::shared_ptr<::arrow::Table> table; + // Call the version that skips bound checks/pre-buffering, since we've done that + // already + RETURN_NOT_OK(self->ReadRowGroupsImpl({row_group}, column_indices, &table)); + auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table); + ::arrow::RecordBatchVector batches; + while (true) { + std::shared_ptr<::arrow::RecordBatch> batch; + RETURN_NOT_OK(table_reader->ReadNext(&batch)); + if (!batch) { + break; + } + batches.push_back(batch); + } + return ::arrow::util::make_optional<::arrow::RecordBatchVector>(std::move(batches)); + } + + FileReaderImpl* self_; + size_t index_; + std::vector<int> row_groups_; + std::vector<int> column_indices_; +}; + +::arrow::Result< + ::arrow::AsyncGenerator<::arrow::util::optional<::arrow::RecordBatchVector>>> +FileReaderImpl::GetRecordBatchGenerator(const std::vector<int>& row_groups, + const std::vector<int>& column_indices) { + RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); + if (reader_properties_.pre_buffer()) { + // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + // N.B. we (and underlying Parquet reader) must outlive generator + return RowGroupGenerator(this, row_groups, column_indices); Review comment: Note that it's true for the RecordBatchReader for Parquet as well; if you look at ParquetScanTask in dataset/file_parquet.cc, there's a similar note there. I think it's solely because we don't have enable_shared_from_this for the Parquet readers, I'm not sure if there's a reason why we omit that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org