This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 589a847545 GH-33652: [C++][Parquet] Add interface 
total_compressed_bytes_written (#33897)
589a847545 is described below

commit 589a847545da9c7164e99a58343bd44f187bcbee
Author: mwish <[email protected]>
AuthorDate: Mon Feb 27 23:57:06 2023 +0800

    GH-33652: [C++][Parquet] Add interface total_compressed_bytes_written 
(#33897)
    
    
    
    ### Rationale for this change
    
    ### What changes are included in this PR?
    
    Talked in https://github.com/apache/arrow/issues/33652 . Main issue is that 
`total_bytes_written` is confusing, because it only tells the size of 
"uncompressed" bytes size written. For buffered page writer, the actually 
written size cannot be known until all column chunk is written and call 
`sink_.Tell()`.
    
    I'd like to:
    * [x] Add interface for PageWriter
    * [x] Add interface for ColumnWriter
    * [x] Add interface for RowGroupWriter
    * [x] Testing them
    
    ### Are these changes tested?
    
    I'd like to test it later.
    
    ### Are there any user-facing changes?
    
    User can get excatly size written by: `total_compressed_bytes_written() + 
total_compressed_bytes()` for ColumnWriter before finish writing.
    
    * Closes: #33652
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Co-authored-by: Gang Wu <[email protected]>
    Signed-off-by: Will Jones <[email protected]>
---
 cpp/src/parquet/column_writer.cc       |  19 ++++
 cpp/src/parquet/column_writer.h        |  18 +++-
 cpp/src/parquet/column_writer_test.cc  | 164 +++++++++++++++++++++++++++++++++
 cpp/src/parquet/file_serialize_test.cc |  14 ++-
 cpp/src/parquet/file_writer.cc         |  27 ++++++
 cpp/src/parquet/file_writer.h          |  11 ++-
 cpp/src/parquet/stream_writer.cc       |   1 -
 7 files changed, 247 insertions(+), 7 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 4a5a6819b3..829beb5325 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -466,6 +466,10 @@ class SerializedPageWriter : public PageWriter {
 
   int64_t total_uncompressed_size() { return total_uncompressed_size_; }
 
+  int64_t total_compressed_bytes_written() const override {
+    return total_compressed_size_;
+  }
+
   bool page_checksum_verification() { return page_checksum_verification_; }
 
  private:
@@ -527,7 +531,13 @@ class SerializedPageWriter : public PageWriter {
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;
+  // The uncompressed page size the page writer has already
+  //  written.
   int64_t total_uncompressed_size_;
+  // The compressed page size the page writer has already
+  //  written.
+  // If the column is UNCOMPRESSED, the size would be
+  //  equal to `total_uncompressed_size_`.
   int64_t total_compressed_size_;
   int32_t page_ordinal_;
   int16_t row_group_ordinal_;
@@ -607,6 +617,10 @@ class BufferedPageWriter : public PageWriter {
 
   bool has_compressor() override { return pager_->has_compressor(); }
 
+  int64_t total_compressed_bytes_written() const override {
+    return pager_->total_compressed_bytes_written();
+  }
+
  private:
   std::shared_ptr<ArrowOutputStream> final_sink_;
   ColumnChunkMetaDataBuilder* metadata_;
@@ -773,6 +787,7 @@ class ColumnWriterImpl {
   int64_t total_bytes_written_;
 
   // Records the current number of compressed bytes in a column
+  // These bytes are unwritten to `pager_` yet
   int64_t total_compressed_bytes_;
 
   // Flag to check if the Writer has been closed
@@ -1276,6 +1291,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
 
   int64_t total_bytes_written() const override { return total_bytes_written_; }
 
+  int64_t total_compressed_bytes_written() const override {
+    return pager_->total_compressed_bytes_written();
+  }
+
   const WriterProperties* properties() override { return properties_; }
 
   bool pages_change_on_record_boundaries() const {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 243927449e..4dd4b10ccc 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -104,12 +104,15 @@ class PARQUET_EXPORT PageWriter {
   // Return the number of uncompressed bytes written (including header size)
   virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
 
+  /// \brief The total number of bytes written as serialized data and
+  /// dictionary pages to the sink so far.
+  virtual int64_t total_compressed_bytes_written() const = 0;
+
   virtual bool has_compressor() = 0;
 
   virtual void Compress(const Buffer& src_buffer, ResizableBuffer* 
dest_buffer) = 0;
 };
 
-static constexpr int WRITE_BATCH_SIZE = 1000;
 class PARQUET_EXPORT ColumnWriter {
  public:
   virtual ~ColumnWriter() = default;
@@ -131,14 +134,23 @@ class PARQUET_EXPORT ColumnWriter {
   /// \brief The number of rows written so far
   virtual int64_t rows_written() const = 0;
 
-  /// \brief The total size of the compressed pages + page headers. Some values
-  /// might be still buffered and not written to a page yet
+  /// \brief The total size of the compressed pages + page headers. Values
+  /// are still buffered and not written to a pager yet
+  ///
+  /// So in un-buffered mode, it always returns 0
   virtual int64_t total_compressed_bytes() const = 0;
 
   /// \brief The total number of bytes written as serialized data and
   /// dictionary pages to the ColumnChunk so far
+  /// These bytes are uncompressed bytes.
   virtual int64_t total_bytes_written() const = 0;
 
+  /// \brief The total number of bytes written as serialized data and
+  /// dictionary pages to the ColumnChunk so far.
+  /// If the column is uncompressed, the value would be equal to
+  /// total_bytes_written().
+  virtual int64_t total_compressed_bytes_written() const = 0;
+
   /// \brief The file-level writer properties
   virtual const WriterProperties* properties() = 0;
 
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index f97108153b..aa05f4e791 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1318,5 +1318,169 @@ TEST(TestColumnWriter, 
WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) {
   }
 }
 
+class ColumnWriterTestSizeEstimated : public ::testing::Test {
+ public:
+  void SetUp() {
+    sink_ = CreateOutputStream();
+    node_ = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED,
+                        {
+                            schema::Int32("required", Repetition::REQUIRED),
+                        }));
+    std::vector<schema::NodePtr> fields;
+    schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+    schema_descriptor_->Init(node_);
+  }
+
+  std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type 
compression,
+                                                    bool buffered,
+                                                    bool enable_dictionary = 
false) {
+    auto builder = WriterProperties::Builder();
+    builder.disable_dictionary()
+        ->compression(compression)
+        ->data_pagesize(100 * sizeof(int));
+    if (enable_dictionary) {
+      builder.enable_dictionary();
+    } else {
+      builder.disable_dictionary();
+    }
+    writer_properties_ = builder.build();
+    metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
+                                                 
schema_descriptor_->Column(0));
+
+    std::unique_ptr<PageWriter> pager = PageWriter::Open(
+        sink_, compression, Codec::UseDefaultCompressionLevel(), 
metadata_.get(),
+        /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1,
+        ::arrow::default_memory_pool(), /* buffered_row_group */ buffered,
+        /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR,
+        /* enable_checksum */ false);
+    return std::static_pointer_cast<parquet::Int32Writer>(
+        ColumnWriter::Make(metadata_.get(), std::move(pager), 
writer_properties_.get()));
+  }
+
+  std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
+  std::shared_ptr<GroupNode> node_;
+  std::unique_ptr<SchemaDescriptor> schema_descriptor_;
+
+  std::shared_ptr<WriterProperties> writer_properties_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+};
+
+TEST_F(ColumnWriterTestSizeEstimated, NonBuffered) {
+  auto required_writer =
+      this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false);
+  // Write half page, page will not be flushed after loop
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page not flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // unbuffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  // Write half page, page be flushed after loop
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+
+  // Test after closed
+  int64_t written_size = required_writer->Close();
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(written_size, required_writer->total_bytes_written());
+  // uncompressed writer should be equal
+  EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, Buffered) {
+  auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, /* 
buffered*/ true);
+  // Write half page, page will not be flushed after loop
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page not flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // buffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  // Write half page, page be flushed after loop
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+
+  // Test after closed
+  int64_t written_size = required_writer->Close();
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(written_size, required_writer->total_bytes_written());
+  // uncompressed writer should be equal
+  EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, NonBufferedDictionary) {
+  auto required_writer =
+      this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false, true);
+  // for dict, keep all values equal
+  int32_t dict_value = 1;
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page not flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  // write a huge batch to trigger page flush
+  for (int32_t i = 0; i < 50000; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+  }
+  // Page flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_LT(400, required_writer->total_compressed_bytes());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+
+  required_writer->Close();
+
+  // Test after closed
+  int64_t written_size = required_writer->Close();
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(written_size, required_writer->total_bytes_written());
+  // uncompressed writer should be equal
+  EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, BufferedCompression) {
+#ifndef ARROW_WITH_SNAPPY
+  GTEST_SKIP() << "Test requires snappy compression";
+#endif
+  auto required_writer = this->BuildWriter(Compression::SNAPPY, true);
+
+  // Write half page
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page not flushed, check size
+  EXPECT_EQ(0, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());  // buffered
+  EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+  for (int32_t i = 0; i < 50; i++) {
+    required_writer->WriteBatch(1, nullptr, nullptr, &i);
+  }
+  // Page flushed, check size
+  EXPECT_LT(400, required_writer->total_bytes_written());
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_LT(required_writer->total_compressed_bytes_written(),
+            required_writer->total_bytes_written());
+
+  // Test after closed
+  int64_t written_size = required_writer->Close();
+  EXPECT_EQ(0, required_writer->total_compressed_bytes());
+  EXPECT_EQ(written_size, required_writer->total_bytes_written());
+  EXPECT_GT(written_size, required_writer->total_compressed_bytes_written());
+}
+
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/file_serialize_test.cc 
b/cpp/src/parquet/file_serialize_test.cc
index eb1133d8a9..85bfd1c514 100644
--- a/cpp/src/parquet/file_serialize_test.cc
+++ b/cpp/src/parquet/file_serialize_test.cc
@@ -84,8 +84,13 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
         // Ensure column() API which is specific to BufferedRowGroup cannot be 
called
         ASSERT_THROW(row_group_writer->column(col), ParquetException);
       }
-
+      EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+      EXPECT_NE(0, row_group_writer->total_bytes_written());
+      EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
       row_group_writer->Close();
+      EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+      EXPECT_NE(0, row_group_writer->total_bytes_written());
+      EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
     }
     // Write half BufferedRowGroups
     for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
@@ -102,12 +107,19 @@ class TestSerialize : public PrimitiveTypedTest<TestType> 
{
           ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
         }
       }
+      // total_compressed_bytes() may equal to 0 if no dictionary enabled and 
no buffered
+      // values.
+      EXPECT_EQ(0, row_group_writer->total_bytes_written());
+      EXPECT_EQ(0, row_group_writer->total_compressed_bytes_written());
       for (int col = 0; col < num_columns_; ++col) {
         auto column_writer =
             
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
         column_writer->Close();
       }
       row_group_writer->Close();
+      EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+      EXPECT_NE(0, row_group_writer->total_bytes_written());
+      EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
     }
     file_writer->Close();
 
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index b30a2633c9..ec2e0e8a8a 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -62,6 +62,10 @@ int64_t RowGroupWriter::total_bytes_written() const {
   return contents_->total_bytes_written();
 }
 
+int64_t RowGroupWriter::total_compressed_bytes_written() const {
+  return contents_->total_compressed_bytes_written();
+}
+
 bool RowGroupWriter::buffered() const { return contents_->buffered(); }
 
 int RowGroupWriter::current_column() { return contents_->current_column(); }
@@ -90,6 +94,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
         metadata_(metadata),
         properties_(properties),
         total_bytes_written_(0),
+        total_compressed_bytes_written_(0),
         closed_(false),
         row_group_ordinal_(row_group_ordinal),
         next_column_index_(0),
@@ -126,6 +131,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
 
     if (column_writers_[0]) {
       total_bytes_written_ += column_writers_[0]->Close();
+      total_compressed_bytes_written_ +=
+          column_writers_[0]->total_compressed_bytes_written();
     }
 
     ++next_column_index_;
@@ -171,6 +178,9 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   }
 
   int64_t total_bytes_written() const override {
+    if (closed_) {
+      return total_bytes_written_;
+    }
     int64_t total_bytes_written = 0;
     for (size_t i = 0; i < column_writers_.size(); i++) {
       if (column_writers_[i]) {
@@ -180,6 +190,20 @@ class RowGroupSerializer : public RowGroupWriter::Contents 
{
     return total_bytes_written;
   }
 
+  int64_t total_compressed_bytes_written() const override {
+    if (closed_) {
+      return total_compressed_bytes_written_;
+    }
+    int64_t total_compressed_bytes_written = 0;
+    for (size_t i = 0; i < column_writers_.size(); i++) {
+      if (column_writers_[i]) {
+        total_compressed_bytes_written +=
+            column_writers_[i]->total_compressed_bytes_written();
+      }
+    }
+    return total_compressed_bytes_written;
+  }
+
   bool buffered() const override { return buffered_row_group_; }
 
   void Close() override {
@@ -190,6 +214,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
       for (size_t i = 0; i < column_writers_.size(); i++) {
         if (column_writers_[i]) {
           total_bytes_written_ += column_writers_[i]->Close();
+          total_compressed_bytes_written_ +=
+              column_writers_[i]->total_compressed_bytes_written();
           column_writers_[i].reset();
         }
       }
@@ -207,6 +233,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   mutable RowGroupMetaDataBuilder* metadata_;
   const WriterProperties* properties_;
   int64_t total_bytes_written_;
+  int64_t total_compressed_bytes_written_;
   bool closed_;
   int16_t row_group_ordinal_;
   int next_column_index_;
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index e6d8608a75..f2888599fb 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -52,10 +52,12 @@ class PARQUET_EXPORT RowGroupWriter {
     virtual int current_column() const = 0;
     virtual void Close() = 0;
 
-    // total bytes written by the page writer
+    /// \brief total uncompressed bytes written by the page writer
     virtual int64_t total_bytes_written() const = 0;
-    // total bytes still compressed but not written
+    /// \brief total bytes still compressed but not written by the page writer
     virtual int64_t total_compressed_bytes() const = 0;
+    /// \brief total compressed bytes written by the page writer
+    virtual int64_t total_compressed_bytes_written() const = 0;
 
     virtual bool buffered() const = 0;
   };
@@ -90,8 +92,13 @@ class PARQUET_EXPORT RowGroupWriter {
    */
   int64_t num_rows() const;
 
+  /// \brief total uncompressed bytes written by the page writer
   int64_t total_bytes_written() const;
+  /// \brief total bytes still compressed but not written by the page writer.
+  /// It will always return 0 from the SerializedPageWriter.
   int64_t total_compressed_bytes() const;
+  /// \brief total compressed bytes written by the page writer
+  int64_t total_compressed_bytes_written() const;
 
   /// Returns whether the current RowGroupWriter is in the buffered mode and 
is created
   /// by calling ParquetFileWriter::AppendBufferedRowGroup.
diff --git a/cpp/src/parquet/stream_writer.cc b/cpp/src/parquet/stream_writer.cc
index dc76c2935d..856436d701 100644
--- a/cpp/src/parquet/stream_writer.cc
+++ b/cpp/src/parquet/stream_writer.cc
@@ -294,7 +294,6 @@ void StreamWriter::EndRow() {
     }
     // Initialize for each row with size already written
     // (compressed + uncompressed).
-    //
     row_group_size_ = row_group_writer_->total_bytes_written() +
                       row_group_writer_->total_compressed_bytes();
   }

Reply via email to