mapleFU commented on code in PR #36779:
URL: https://github.com/apache/arrow/pull/36779#discussion_r1268951056


##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
   virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
                                         std::shared_ptr<::arrow::Table>* out) 
= 0;
 
+  using AsyncBatchGenerator =
+      std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+  /// \brief Read a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param i the index of the row group to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+  /// \brief Read some columns from a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  /// \see ReadTable for details on how column indices are resolved
+  ///
+  /// \param i the index of the row group to read
+  /// \param column_indices leaf-indices of the columns to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                const std::vector<int>& 
column_indices,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+
+  /// \brief Read row groups from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param row_groups indices of the row groups to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupsAsync(
+      const std::vector<int>& row_groups, ::arrow::internal::Executor* 
cpu_executor,
+      bool allow_sliced_batches = false) = 0;
+
+  /// \brief Read some columns from the given rows groups from the file
+  ///
+  /// If pre-buffering is enabled then all of the data will be read using the 
pre-buffer
+  /// cache. See ParquetFileReader::PreBuffer for details on how this affects 
memory and
+  /// performance.
+  ///
+  /// This operation is not perfectly async.  The read from disk will be done 
on an I/O
+  /// thread, which is correct.  However, compression and  column decoding is 
also done on

Review Comment:
   ```suggestion
     /// thread, which is correct.  However, compression and column decoding is 
also done on
   ```



##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
   virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
                                         std::shared_ptr<::arrow::Table>* out) 
= 0;
 
+  using AsyncBatchGenerator =
+      std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+  /// \brief Read a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param i the index of the row group to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+  /// \brief Read some columns from a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  /// \see ReadTable for details on how column indices are resolved
+  ///
+  /// \param i the index of the row group to read
+  /// \param column_indices leaf-indices of the columns to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                const std::vector<int>& 
column_indices,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+
+  /// \brief Read row groups from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param row_groups indices of the row groups to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupsAsync(
+      const std::vector<int>& row_groups, ::arrow::internal::Executor* 
cpu_executor,
+      bool allow_sliced_batches = false) = 0;
+
+  /// \brief Read some columns from the given rows groups from the file
+  ///
+  /// If pre-buffering is enabled then all of the data will be read using the 
pre-buffer
+  /// cache. See ParquetFileReader::PreBuffer for details on how this affects 
memory and
+  /// performance.
+  ///
+  /// This operation is not perfectly async.  The read from disk will be done 
on an I/O
+  /// thread, which is correct.  However, compression and  column decoding is 
also done on
+  /// the I/O thread which may not be ideal.  The stage after that 
(transferring the
+  /// decoded data into Arrow structures and fulfilling the future) should be 
done as a
+  /// new task on the cpu_executor.
+  ///
+  /// The returned generator will respect the batch size set in the reader 
properties.

Review Comment:
   ```suggestion
     /// The returned generator will respect the batch size set in the 
ArrowReaderProperties.
   ```
   
   Should this be better?



##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -316,6 +318,23 @@ class FileReaderImpl : public FileReader {
     return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), 
table);
   }
 
+  Result<AsyncBatchGenerator> DoReadRowGroupsAsync(
+      const std::vector<int>& row_groups, const std::vector<int>& indices,

Review Comment:
   ```suggestion
         const std::vector<int>& row_groups, const std::vector<int>& 
column_indices,
   ```



##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
   virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
                                         std::shared_ptr<::arrow::Table>* out) 
= 0;
 
+  using AsyncBatchGenerator =
+      std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+  /// \brief Read a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param i the index of the row group to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+  /// \brief Read some columns from a single row group from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  /// \see ReadTable for details on how column indices are resolved
+  ///
+  /// \param i the index of the row group to read
+  /// \param column_indices leaf-indices of the columns to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+                                                const std::vector<int>& 
column_indices,
+                                                ::arrow::internal::Executor* 
cpu_executor,
+                                                bool allow_sliced_batches = 
false) = 0;
+
+  /// \brief Read row groups from the file
+  ///
+  /// \see ReadRowGroupsAsync for operation details
+  ///
+  /// \param row_groups indices of the row groups to read
+  /// \param cpu_executor an executor to use to run CPU tasks
+  /// \param allow_sliced_batches if false, an error is raised if a batch has 
too much
+  ///                             data for the given batch size.  If true, 
smaller
+  ///                             batches will be returned instead.
+  virtual AsyncBatchGenerator ReadRowGroupsAsync(
+      const std::vector<int>& row_groups, ::arrow::internal::Executor* 
cpu_executor,
+      bool allow_sliced_batches = false) = 0;
+
+  /// \brief Read some columns from the given rows groups from the file
+  ///
+  /// If pre-buffering is enabled then all of the data will be read using the 
pre-buffer
+  /// cache. See ParquetFileReader::PreBuffer for details on how this affects 
memory and
+  /// performance.
+  ///
+  /// This operation is not perfectly async.  The read from disk will be done 
on an I/O
+  /// thread, which is correct.  However, compression and  column decoding is 
also done on
+  /// the I/O thread which may not be ideal.  The stage after that 
(transferring the
+  /// decoded data into Arrow structures and fulfilling the future) should be 
done as a
+  /// new task on the cpu_executor.
+  ///
+  /// The returned generator will respect the batch size set in the reader 
properties.
+  /// Batches will not be larger than the given batch size.  However, batches 
may be
+  /// smaller.  This can happen, for example, when there is not enough data or 
when a

Review Comment:
   Could the batch size be "0"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to