mapleFU commented on code in PR #36779:
URL: https://github.com/apache/arrow/pull/36779#discussion_r1304748443


##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1233,6 +1285,127 @@ Status FileReaderImpl::ReadRowGroups(const 
std::vector<int>& row_groups,
   return Status::OK();
 }
 
+struct AsyncBatchGeneratorState {
+  ::arrow::internal::Executor* io_executor;
+  ::arrow::internal::Executor* cpu_executor;
+  std::vector<std::shared_ptr<ColumnReaderImpl>> column_readers;
+  std::vector<std::shared_ptr<ChunkedArray>> current_cols;
+  std::queue<std::shared_ptr<RecordBatch>> overflow;
+  std::shared_ptr<::arrow::Schema> schema;
+  int64_t batch_size;
+  int64_t rows_remaining;
+  bool use_threads;
+  bool allow_sliced_batches;
+};
+
+class AsyncBatchGeneratorImpl {
+ public:
+  explicit AsyncBatchGeneratorImpl(std::shared_ptr<AsyncBatchGeneratorState> 
state)
+      : state_(std::move(state)) {}
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    if (!state_->overflow.empty()) {
+      std::shared_ptr<RecordBatch> next = std::move(state_->overflow.front());
+      state_->overflow.pop();
+      return next;
+    }
+
+    if (state_->rows_remaining == 0) {
+      // Exhausted
+      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
+          ::arrow::IterationEnd<std::shared_ptr<RecordBatch>>());
+    }
+
+    int64_t rows_in_batch = std::min(state_->rows_remaining, 
state_->batch_size);
+    state_->rows_remaining -= rows_in_batch;
+
+    // We read the columns in parallel.  Each reader returns a chunked array.  
This is
+    // because we might need to chunk a column if that column is too large.  We
+    // do provide a batch size but even for a small batch size it is possible 
that a
+    // column has extremely large strings which don't fit in a single batch.
+    std::vector<Future<>> column_futs;
+    column_futs.reserve(state_->column_readers.size());
+    for (std::size_t column_index = 0; column_index < 
state_->column_readers.size();
+         column_index++) {
+      const auto& column_reader = state_->column_readers[column_index];
+      column_futs.push_back(
+          column_reader
+              ->NextBatchAsync(rows_in_batch, state_->io_executor, 
state_->cpu_executor)
+              .Then([state = state_,
+                     column_index](const std::shared_ptr<ChunkedArray>& 
chunked_array) {
+                state->current_cols[column_index] = chunked_array;
+              }));
+    }
+    Future<> all_columns_finished_fut = 
::arrow::AllFinished(std::move(column_futs));
+
+    // Grab the first batch of data and return it.  If there is more than one 
batch then
+    // throw the remaining batches into overflow and they will be fetched on 
the next call
+    return all_columns_finished_fut.Then(
+        [state = state_, rows_in_batch]() -> 
Result<std::shared_ptr<RecordBatch>> {
+          std::shared_ptr<Table> table =
+              Table::Make(state->schema, state->current_cols, rows_in_batch);
+          ::arrow::TableBatchReader batch_reader(*table);
+          std::shared_ptr<RecordBatch> first;
+          while (true) {
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> next_batch,
+                                  batch_reader.Next());
+            if (!next_batch) {
+              break;
+            }
+            if (first) {
+              if (!state->allow_sliced_batches) {
+                return Status::Invalid(
+                    "The setting allow_sliced_batches is set to false and data 
was "
+                    "encountered that was too large to fit in a single 
batch.");
+              }
+              state->overflow.push(std::move(next_batch));
+            } else {
+              first = std::move(next_batch);
+            }
+          }
+          // If there were no rows of data we should have been exhausted.  
Otherwise we
+          // should have gotten at least one batch.
+          DCHECK(!!first);
+          return first;
+        });
+  }
+
+ private:
+  std::shared_ptr<AsyncBatchGeneratorState> state_;
+};
+
+Result<FileReaderImpl::AsyncBatchGenerator> 
FileReaderImpl::DoReadRowGroupsAsync(
+    const std::vector<int>& row_groups, const std::vector<int>& column_indices,
+    ::arrow::internal::Executor* cpu_executor, bool allow_sliced_batches) {
+  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
+
+  if (reader_properties_.pre_buffer()) {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    parquet_reader()->PreBuffer(row_groups, column_indices,
+                                reader_properties_.io_context(),
+                                reader_properties_.cache_options());
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+
+  auto generator_state = std::make_shared<AsyncBatchGeneratorState>();
+  generator_state->io_executor = reader_properties_.io_context().executor();
+  generator_state->cpu_executor = cpu_executor;
+  generator_state->use_threads = reader_properties_.use_threads();

Review Comment:
   Final question, it's not related to correctness, but it's a bit confusing
   
   ```
     /// This method ignores the use_threads property of the 
ArrowReaderProperties.  It will
     /// always create a task for each column.  To run without threads you 
should use a
     /// serial executor as the CPU executor.
   ```
   
   So why is `use_threads` introduced here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to