wgtmac commented on code in PR #48468:
URL: https://github.com/apache/arrow/pull/48468#discussion_r2723638507


##########
cpp/src/parquet/file_writer.cc:
##########
@@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written() 
const {
   return contents_->total_compressed_bytes_written();
 }
 
+int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const {
+  return contents_->total_compressed_bytes() +
+         contents_->total_compressed_bytes_written() +
+         contents_->EstimatedBufferedValueBytes();

Review Comment:
   `contents_->EstimatedBufferedValueBytes()` may under-estimate the size 
because it only accounts for the values buffered by `current_encoder_`, and 
ignores buffered values in the `repetition_levels_rle_`, 
`definition_levels_rle_` and `current_dict_encoder_`. For deep nested types, 
level values may be significantly larger than the values.



##########
cpp/src/parquet/properties.h:
##########
@@ -160,6 +160,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
 static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = 
kDefaultDataPageSize;
 static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
 static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024;
+static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024;

Review Comment:
   I think Apache Impala also produces single row group in its Parquet writer. 
However I think limiting row group size in bytes is still useful in some cases. 
For example, there is a table property in Iceberg: 
https://github.com/apache/iceberg/blob/73a26fc1f49e6749656a273b2e4d78eb9e64f19e/docs/docs/configuration.md?plain=1#L46.
 As iceberg-cpp is depending on the Parquet writer here, it is nice to support 
this feature. 



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -397,13 +397,18 @@ class FileWriterImpl : public FileWriter {
 
     if (chunk_size <= 0 && table.num_rows() > 0) {
       return Status::Invalid("chunk size per row_group must be greater than 
0");
-    } else if (!table.schema()->Equals(*schema_, false)) {
+    } else if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) {
       return Status::Invalid("table schema does not match this writer's. 
table:'",
                              table.schema()->ToString(), "' this:'", 
schema_->ToString(),
                              "'");
     } else if (chunk_size > this->properties().max_row_group_length()) {
       chunk_size = this->properties().max_row_group_length();
     }
+    if (auto avg_row_size = EstimateCompressedBytesPerRow()) {

Review Comment:
   We need to check if `avg_row_size` is 0 or NaN to skip the calculation 
below. Perhaps we can check this in `EstimateCompressedBytesPerRow()` to return 
std::nullopt.



##########
cpp/src/parquet/file_writer.h:
##########
@@ -58,6 +58,9 @@ class PARQUET_EXPORT RowGroupWriter {
     virtual int64_t total_compressed_bytes() const = 0;
     /// \brief total compressed bytes written by the page writer
     virtual int64_t total_compressed_bytes_written() const = 0;
+    /// \brief estimated bytes of values that are buffered by the page writer

Review Comment:
   ```suggestion
       /// \brief Estimated bytes of values that are buffered by the page writer
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -480,17 +481,24 @@ class FileWriterImpl : public FileWriter {
       return Status::OK();
     };
 
+    const int64_t max_row_group_length = 
this->properties().max_row_group_length();
+    const int64_t max_row_group_bytes = 
this->properties().max_row_group_bytes();
+
     int64_t offset = 0;
     while (offset < batch.num_rows()) {
-      const int64_t batch_size =
-          std::min(max_row_group_length - row_group_writer_->num_rows(),
-                   batch.num_rows() - offset);
-      RETURN_NOT_OK(WriteBatch(offset, batch_size));
-      offset += batch_size;
-
-      // Flush current row group writer and create a new writer if it is full.
-      if (row_group_writer_->num_rows() >= max_row_group_length &&
-          offset < batch.num_rows()) {
+      int64_t batch_size = std::min(max_row_group_length - 
row_group_writer_->num_rows(),
+                                    batch.num_rows() - offset);
+      if (auto avg_row_size = EstimateCompressedBytesPerRow()) {
+        int64_t buffered_bytes = 
row_group_writer_->EstimatedTotalCompressedBytes();
+        batch_size = std::min(
+            batch_size, static_cast<int64_t>((max_row_group_bytes - 
buffered_bytes) /
+                                             avg_row_size.value()));
+      }
+      if (batch_size > 0) {
+        RETURN_NOT_OK(WriteBatch(offset, batch_size));
+        offset += batch_size;
+      } else if (offset < batch.num_rows()) {
+        // Current row group is full, write remaining rows in a new group.

Review Comment:
   How do you achieve the step 4 above `next loop to 1`? This looks a bit 
complicated to me. How about the logic below.
   
   For each loop iteration:
   
   1. Check the row group threshold (both rows and bytes) and append a new row 
group if needed.
   2. Compute a new `batch_size` based on different conditions and set its 
minimum to 1 so we don't get empty batch.
   3. Write the batch as before.
   



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -378,19 +378,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2);
 template <>
 struct test_traits<::arrow::StringType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
-  static std::string const value;
+  static const std::string value;

Review Comment:
   Could you revert these 3 lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to