wgtmac commented on code in PR #36377:
URL: https://github.com/apache/arrow/pull/36377#discussion_r1251740661


##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -274,6 +274,16 @@ class ArrowColumnWriterV2 {
   RowGroupWriter* row_group_writer_;
 };
 
+std::shared_ptr<ChunkedArray> getColumnChunkedArray(const 
::arrow::RecordBatch& value,
+                                                    int column_id) {
+  return std::make_shared<::arrow::ChunkedArray>(value.column(column_id));
+}
+
+std::shared_ptr<ChunkedArray> getColumnChunkedArray(const ::arrow::Table& 
value,

Review Comment:
   ```suggestion
   std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& 
value,
   ```



##########
cpp/src/parquet/arrow/writer.h:
##########
@@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter {
 
   /// \brief Write a Table to Parquet.
   ///
+  /// If `use_buffering` is false, then any pending row group is closed
+  /// at the beginning and at the end of this call.
+  /// If `use_buffering` is true, this function reuses an existing
+  /// buffered row group until the chunk size is met, and leaves
+  /// the last row group open for further writes.
+  /// It is recommended to set `use_buffering` to true to minimize
+  /// the number of row groups, especially when calling `WriteTable`
+  /// with small tables.
+  ///
   /// \param table Arrow table to write.
   /// \param chunk_size maximum number of rows to write per row group.
-  virtual ::arrow::Status WriteTable(
-      const ::arrow::Table& table, int64_t chunk_size = 
DEFAULT_MAX_ROW_GROUP_LENGTH) = 0;
+  /// \param use_buffering Whether to potentially buffer data.
+  virtual ::arrow::Status WriteTable(const ::arrow::Table& table,

Review Comment:
   How about adding a separate `WriteBufferedTable(const ::arrow::Table& 
table)` function? 
   
   My rationales are:
   - `chunk_size` would be confusing since the 
`WriterProperties::max_row_group_length()` serves the same purpose. 
   - Adding a new `use_buffering` parameter complicates the handling of 
`WriteTable` to deal with previous state.
   - In most cases, we don't expect users to mix calls of `use_buffering=true` 
and `use_buffering=false`.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter {
 
   std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
 
-  Status WriteTable(const Table& table, int64_t chunk_size) override {
+  template <typename T>
+  Status WriteBuffered(const T& batch, int64_t max_row_group_length);

Review Comment:
   ```suggestion
     Status WriteBufferedRowGroup(const T& batch, int64_t max_row_group_length);
   ```



##########
cpp/src/parquet/arrow/writer.h:
##########
@@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter {
 
   /// \brief Write a Table to Parquet.
   ///
+  /// If `use_buffering` is false, then any pending row group is closed
+  /// at the beginning and at the end of this call.
+  /// If `use_buffering` is true, this function reuses an existing
+  /// buffered row group until the chunk size is met, and leaves
+  /// the last row group open for further writes.
+  /// It is recommended to set `use_buffering` to true to minimize
+  /// the number of row groups, especially when calling `WriteTable`
+  /// with small tables.
+  ///
   /// \param table Arrow table to write.
   /// \param chunk_size maximum number of rows to write per row group.
-  virtual ::arrow::Status WriteTable(
-      const ::arrow::Table& table, int64_t chunk_size = 
DEFAULT_MAX_ROW_GROUP_LENGTH) = 0;
+  /// \param use_buffering Whether to potentially buffer data.
+  virtual ::arrow::Status WriteTable(const ::arrow::Table& table,

Review Comment:
   IIUC, the use case of `WriteTable()` and `WriteRecordBatch()` should be:
   - A single RecordBatch object should not be large, so `WriteRecordBatch()` 
is implemented to write in the buffered mode to avoid small row groups.
   - A single Table usually holds a large number of records, so it makes sense 
to see `WriteTable()` breaks the table into row groups and does not use the 
buffered mode.
   
   The current PR is addressing the issue when a Table object is small which 
should copy the same behavior of `WriteRecordBatch`.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter {
     return writer_->metadata();
   }
 
+  Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) {

Review Comment:
   ```suggestion
     Status WriteNonBufferedRowGroup(const Table& table, int64_t chunk_size) {
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -274,6 +274,16 @@ class ArrowColumnWriterV2 {
   RowGroupWriter* row_group_writer_;
 };
 
+std::shared_ptr<ChunkedArray> getColumnChunkedArray(const 
::arrow::RecordBatch& value,

Review Comment:
   ```suggestion
   std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const 
::arrow::RecordBatch& value,
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter {
     }
 
     // Max number of rows allowed in a row group.
-    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
-
-    if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
-        row_group_writer_->num_rows() >= max_row_group_length) {
-      RETURN_NOT_OK(NewBufferedRowGroup());
-    }
-
-    auto WriteBatch = [&](int64_t offset, int64_t size) {
-      std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
-      int column_index_start = 0;
-
-      for (int i = 0; i < batch.num_columns(); i++) {
-        ChunkedArray chunked_array{batch.column(i)};
-        ARROW_ASSIGN_OR_RAISE(
-            std::unique_ptr<ArrowColumnWriterV2> writer,
-            ArrowColumnWriterV2::Make(chunked_array, offset, size, 
schema_manifest_,
-                                      row_group_writer_, column_index_start));
-        column_index_start += writer->leaf_count();
-        if (arrow_properties_->use_threads()) {
-          writers.emplace_back(std::move(writer));
-        } else {
-          RETURN_NOT_OK(writer->Write(&column_write_context_));
-        }
-      }
-
-      if (arrow_properties_->use_threads()) {

Review Comment:
   Moving the async logic into `WriteTable` may cause a potential concurrent 
issue mentioned by @westonpace in this 
[comment](https://github.com/apache/arrow/pull/33656#pullrequestreview-1250353638).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to