lidavidm commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r591953765



##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::util::optional<::arrow::RecordBatchVector>;
+
+  explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups,
+                             std::vector<int> column_indices)
+      : self_(self),
+        index_(0),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)) {}
+
+  ::arrow::Future<Item> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt);
+    }
+    int row_group = row_groups_[index_++];
+    FileReaderImpl* self = self_;
+    std::vector<int> column_indices = column_indices_;
+    ARROW_ASSIGN_OR_RAISE(auto fut,
+                          ::arrow::internal::GetCpuThreadPool()->Submit(

Review comment:
       Maybe we can hold this until ARROW-7001 is through and then we can see 
what exactly we need to be reentrant to get the pipeline we want.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -967,6 +978,74 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::util::optional<::arrow::RecordBatchVector>;
+
+  explicit RowGroupGenerator(FileReaderImpl* self, std::vector<int> row_groups,
+                             std::vector<int> column_indices)
+      : self_(self),
+        index_(0),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)) {}
+
+  ::arrow::Future<Item> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::Future<Item>::MakeFinished(::arrow::util::nullopt);
+    }
+    int row_group = row_groups_[index_++];
+    FileReaderImpl* self = self_;
+    std::vector<int> column_indices = column_indices_;
+    ARROW_ASSIGN_OR_RAISE(auto fut,
+                          ::arrow::internal::GetCpuThreadPool()->Submit(
+                              &ReadOneRowGroup, self, row_group, 
column_indices));
+    return fut;
+  }
+
+ private:
+  static ::arrow::Result<Item> ReadOneRowGroup(FileReaderImpl* self, const int 
row_group,
+                                               const std::vector<int>& 
column_indices) {
+    std::shared_ptr<::arrow::Table> table;
+    // Call the version that skips bound checks/pre-buffering, since we've 
done that
+    // already
+    RETURN_NOT_OK(self->ReadRowGroupsImpl({row_group}, column_indices, 
&table));
+    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+    ::arrow::RecordBatchVector batches;
+    while (true) {
+      std::shared_ptr<::arrow::RecordBatch> batch;
+      RETURN_NOT_OK(table_reader->ReadNext(&batch));
+      if (!batch) {
+        break;
+      }
+      batches.push_back(batch);
+    }
+    return 
::arrow::util::make_optional<::arrow::RecordBatchVector>(std::move(batches));
+  }
+
+  FileReaderImpl* self_;
+  size_t index_;
+  std::vector<int> row_groups_;
+  std::vector<int> column_indices_;
+};
+
+::arrow::Result<
+    
::arrow::AsyncGenerator<::arrow::util::optional<::arrow::RecordBatchVector>>>
+FileReaderImpl::GetRecordBatchGenerator(const std::vector<int>& row_groups,
+                                        const std::vector<int>& 
column_indices) {
+  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
+  if (reader_properties_.pre_buffer()) {
+    // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if 
enabled
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    reader_->PreBuffer(row_groups, column_indices, 
reader_properties_.io_context(),
+                       reader_properties_.cache_options());
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+  // N.B. we (and underlying Parquet reader) must outlive generator
+  return RowGroupGenerator(this, row_groups, column_indices);

Review comment:
       I think _that's_ supposed to be ok: iterating the scan task should 
implicitly keep the scan task alive, what I meant is that the scan task is 
explicitly keeping the Parquet reader alive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to