wgtmac commented on code in PR #36377: URL: https://github.com/apache/arrow/pull/36377#discussion_r1251740661
########## cpp/src/parquet/arrow/writer.cc: ########## @@ -274,6 +274,16 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; +std::shared_ptr<ChunkedArray> getColumnChunkedArray(const ::arrow::RecordBatch& value, + int column_id) { + return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); +} + +std::shared_ptr<ChunkedArray> getColumnChunkedArray(const ::arrow::Table& value, Review Comment: ```suggestion std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value, ``` ########## cpp/src/parquet/arrow/writer.h: ########## @@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter { /// \brief Write a Table to Parquet. /// + /// If `use_buffering` is false, then any pending row group is closed + /// at the beginning and at the end of this call. + /// If `use_buffering` is true, this function reuses an existing + /// buffered row group until the chunk size is met, and leaves + /// the last row group open for further writes. + /// It is recommended to set `use_buffering` to true to minimize + /// the number of row groups, especially when calling `WriteTable` + /// with small tables. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. - virtual ::arrow::Status WriteTable( - const ::arrow::Table& table, int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH) = 0; + /// \param use_buffering Whether to potentially buffer data. + virtual ::arrow::Status WriteTable(const ::arrow::Table& table, Review Comment: How about adding a separate `WriteBufferedTable(const ::arrow::Table& table)` function? My rationales are: - `chunk_size` would be confusing since the `WriterProperties::max_row_group_length()` serves the same purpose. - Adding a new `use_buffering` parameter complicates the handling of `WriteTable` to deal with previous state. - In most cases, we don't expect users to mix calls of `use_buffering=true` and `use_buffering=false`. ########## cpp/src/parquet/arrow/writer.cc: ########## @@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter { std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } - Status WriteTable(const Table& table, int64_t chunk_size) override { + template <typename T> + Status WriteBuffered(const T& batch, int64_t max_row_group_length); Review Comment: ```suggestion Status WriteBufferedRowGroup(const T& batch, int64_t max_row_group_length); ``` ########## cpp/src/parquet/arrow/writer.h: ########## @@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter { /// \brief Write a Table to Parquet. /// + /// If `use_buffering` is false, then any pending row group is closed + /// at the beginning and at the end of this call. + /// If `use_buffering` is true, this function reuses an existing + /// buffered row group until the chunk size is met, and leaves + /// the last row group open for further writes. + /// It is recommended to set `use_buffering` to true to minimize + /// the number of row groups, especially when calling `WriteTable` + /// with small tables. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. - virtual ::arrow::Status WriteTable( - const ::arrow::Table& table, int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH) = 0; + /// \param use_buffering Whether to potentially buffer data. + virtual ::arrow::Status WriteTable(const ::arrow::Table& table, Review Comment: IIUC, the use case of `WriteTable()` and `WriteRecordBatch()` should be: - A single RecordBatch object should not be large, so `WriteRecordBatch()` is implemented to write in the buffered mode to avoid small row groups. - A single Table usually holds a large number of records, so it makes sense to see `WriteTable()` breaks the table into row groups and does not use the buffered mode. The current PR is addressing the issue when a Table object is small which should copy the same behavior of `WriteRecordBatch`. ########## cpp/src/parquet/arrow/writer.cc: ########## @@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter { return writer_->metadata(); } + Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) { Review Comment: ```suggestion Status WriteNonBufferedRowGroup(const Table& table, int64_t chunk_size) { ``` ########## cpp/src/parquet/arrow/writer.cc: ########## @@ -274,6 +274,16 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; +std::shared_ptr<ChunkedArray> getColumnChunkedArray(const ::arrow::RecordBatch& value, Review Comment: ```suggestion std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value, ``` ########## cpp/src/parquet/arrow/writer.cc: ########## @@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter { } // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - - auto WriteBatch = [&](int64_t offset, int64_t size) { - std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers; - int column_index_start = 0; - - for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunked_array{batch.column(i)}; - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr<ArrowColumnWriterV2> writer, - ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, - row_group_writer_, column_index_start)); - column_index_start += writer->leaf_count(); - if (arrow_properties_->use_threads()) { - writers.emplace_back(std::move(writer)); - } else { - RETURN_NOT_OK(writer->Write(&column_write_context_)); - } - } - - if (arrow_properties_->use_threads()) { Review Comment: Moving the async logic into `WriteTable` may cause a potential concurrent issue mentioned by @westonpace in this [comment](https://github.com/apache/arrow/pull/33656#pullrequestreview-1250353638). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org