westonpace commented on a change in pull request #9620: URL: https://github.com/apache/arrow/pull/9620#discussion_r591875021
########## File path: cpp/src/parquet/arrow/reader.cc ########## @@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, return Status::OK(); } +/// Given a file reader and a list of row groups, this is a generator of record +/// batch vectors (where each vector is the contents of a single row group). +class RowGroupGenerator { + public: + using Item = ::arrow::util::optional<::arrow::RecordBatchVector>; + + explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups, + std::vector<int> column_indices) + : self_(self), + index_(0), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)) {} + + ::arrow::Future<Item> operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt); + } + int row_group = row_groups_[index_++]; + FileReaderImpl* self = self_; + std::vector<int> column_indices = column_indices_; + ARROW_ASSIGN_OR_RAISE(auto fut, + ::arrow::internal::GetCpuThreadPool()->Submit( + &ReadOneRowGroup, self, row_group, column_indices)); + return fut; + } + + private: + static ::arrow::Result<Item> ReadOneRowGroup(FileReaderImpl* self, const int row_group, + const std::vector<int>& column_indices) { + std::shared_ptr<::arrow::Table> table; + // Call the version that skips bound checks/pre-buffering, since we've done that + // already + RETURN_NOT_OK(self->ReadRowGroupsImpl({row_group}, column_indices, &table)); + auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table); + ::arrow::RecordBatchVector batches; + while (true) { + std::shared_ptr<::arrow::RecordBatch> batch; + RETURN_NOT_OK(table_reader->ReadNext(&batch)); + if (!batch) { + break; + } + batches.push_back(batch); + } + return ::arrow::util::make_optional<::arrow::RecordBatchVector>(std::move(batches)); + } + + FileReaderImpl* self_; + size_t index_; + std::vector<int> row_groups_; + std::vector<int> column_indices_; +}; + +::arrow::Result< + ::arrow::AsyncGenerator<::arrow::util::optional<::arrow::RecordBatchVector>>> +FileReaderImpl::GetRecordBatchGenerator(const std::vector<int>& row_groups, + const std::vector<int>& column_indices) { + RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); + if (reader_properties_.pre_buffer()) { + // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + // N.B. we (and underlying Parquet reader) must outlive generator + return RowGroupGenerator(this, row_groups, column_indices); Review comment: Hmm, historically that hasn't been the precedent for the generators. They keep an ownership stake in their resources. Is there some reason the generator can't have a shared pointer to the reader? Consider the dataset scanning example. The scan tasks will be asked for a generator and the scanner will keep track of the generator but the scanner will have no idea what the reader is. Who is keeping track of the reader there? What if the scanner simply discarded the scan task after it got a generator from it. ########## File path: cpp/src/parquet/arrow/reader.cc ########## @@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups, return Status::OK(); } +/// Given a file reader and a list of row groups, this is a generator of record +/// batch vectors (where each vector is the contents of a single row group). +class RowGroupGenerator { + public: + using Item = ::arrow::util::optional<::arrow::RecordBatchVector>; + + explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups, + std::vector<int> column_indices) + : self_(self), + index_(0), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)) {} + + ::arrow::Future<Item> operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt); + } + int row_group = row_groups_[index_++]; + FileReaderImpl* self = self_; + std::vector<int> column_indices = column_indices_; + ARROW_ASSIGN_OR_RAISE(auto fut, + ::arrow::internal::GetCpuThreadPool()->Submit( Review comment: This will be a problem. I think this task will block (I'm assuming `ReadRowGroupsImpl` is synchronous and blocking?) Blocking tasks should not be on the CPU pool. You could put it on the I/O pool but that isn't necessarily ideal either as it complicates sizing the I/O pool. Ideally you want something like `reader_->PreBuffer(row_groups, ...).Then(NonBlockingReadRowGroupsImpl)`. I think you might kind of get away with it because the task they are waiting on is on the I/O thread pool (I assume the prebuffering tasks are on the I/O pool) so you won't have the nested deadlock problem. However, it will not have ideal performance. If you are reading a bunch of files you will have some CPU threads tied up waiting that could be doing work. ########## File path: cpp/src/parquet/arrow/reader.h ########## @@ -175,6 +177,22 @@ class PARQUET_EXPORT FileReader { const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a generator of record batch vectors, where each vector represents + /// the contents of a row group from row_group_indices, whose columns are selected + /// by column_indices. + /// + /// An empty optional indicates the end of the generator. + /// + /// Note that the ordering in row_group_indices and column_indices matter. FileReaders + /// must outlive their generators. + /// + /// \returns error Result if either row_group_indices or column_indices contains an + /// invalid index + virtual ::arrow::Result< Review comment: Answering your question I agree it would be better to have `AsyncGenerator<RecordBatch>` for consistency with the other readers. You can use `MakeVectorGenerator` to get `AsyncGenerator<AsyncGenerator<RecordBatch>>`. Then apply `MakeConcatMapGenerator` to the result to get to `AsyncGenerator<RecordBatch>` ########## File path: cpp/src/parquet/arrow/reader.h ########## @@ -175,6 +177,22 @@ class PARQUET_EXPORT FileReader { const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a generator of record batch vectors, where each vector represents + /// the contents of a row group from row_group_indices, whose columns are selected + /// by column_indices. + /// + /// An empty optional indicates the end of the generator. + /// + /// Note that the ordering in row_group_indices and column_indices matter. FileReaders + /// must outlive their generators. + /// + /// \returns error Result if either row_group_indices or column_indices contains an + /// invalid index + virtual ::arrow::Result< + ::arrow::AsyncGenerator<::arrow::util::optional<::arrow::RecordBatchVector>>> + GetRecordBatchGenerator(const std::vector<int>& row_group_indices, Review comment: Would the scan task be the thing providing the `row_group_indices`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org