This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 589a847545 GH-33652: [C++][Parquet] Add interface
total_compressed_bytes_written (#33897)
589a847545 is described below
commit 589a847545da9c7164e99a58343bd44f187bcbee
Author: mwish <[email protected]>
AuthorDate: Mon Feb 27 23:57:06 2023 +0800
GH-33652: [C++][Parquet] Add interface total_compressed_bytes_written
(#33897)
### Rationale for this change
### What changes are included in this PR?
Talked in https://github.com/apache/arrow/issues/33652 . Main issue is that
`total_bytes_written` is confusing, because it only tells the size of
"uncompressed" bytes size written. For buffered page writer, the actually
written size cannot be known until all column chunk is written and call
`sink_.Tell()`.
I'd like to:
* [x] Add interface for PageWriter
* [x] Add interface for ColumnWriter
* [x] Add interface for RowGroupWriter
* [x] Testing them
### Are these changes tested?
I'd like to test it later.
### Are there any user-facing changes?
User can get excatly size written by: `total_compressed_bytes_written() +
total_compressed_bytes()` for ColumnWriter before finish writing.
* Closes: #33652
Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/column_writer.cc | 19 ++++
cpp/src/parquet/column_writer.h | 18 +++-
cpp/src/parquet/column_writer_test.cc | 164 +++++++++++++++++++++++++++++++++
cpp/src/parquet/file_serialize_test.cc | 14 ++-
cpp/src/parquet/file_writer.cc | 27 ++++++
cpp/src/parquet/file_writer.h | 11 ++-
cpp/src/parquet/stream_writer.cc | 1 -
7 files changed, 247 insertions(+), 7 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 4a5a6819b3..829beb5325 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -466,6 +466,10 @@ class SerializedPageWriter : public PageWriter {
int64_t total_uncompressed_size() { return total_uncompressed_size_; }
+ int64_t total_compressed_bytes_written() const override {
+ return total_compressed_size_;
+ }
+
bool page_checksum_verification() { return page_checksum_verification_; }
private:
@@ -527,7 +531,13 @@ class SerializedPageWriter : public PageWriter {
int64_t num_values_;
int64_t dictionary_page_offset_;
int64_t data_page_offset_;
+ // The uncompressed page size the page writer has already
+ // written.
int64_t total_uncompressed_size_;
+ // The compressed page size the page writer has already
+ // written.
+ // If the column is UNCOMPRESSED, the size would be
+ // equal to `total_uncompressed_size_`.
int64_t total_compressed_size_;
int32_t page_ordinal_;
int16_t row_group_ordinal_;
@@ -607,6 +617,10 @@ class BufferedPageWriter : public PageWriter {
bool has_compressor() override { return pager_->has_compressor(); }
+ int64_t total_compressed_bytes_written() const override {
+ return pager_->total_compressed_bytes_written();
+ }
+
private:
std::shared_ptr<ArrowOutputStream> final_sink_;
ColumnChunkMetaDataBuilder* metadata_;
@@ -773,6 +787,7 @@ class ColumnWriterImpl {
int64_t total_bytes_written_;
// Records the current number of compressed bytes in a column
+ // These bytes are unwritten to `pager_` yet
int64_t total_compressed_bytes_;
// Flag to check if the Writer has been closed
@@ -1276,6 +1291,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
int64_t total_bytes_written() const override { return total_bytes_written_; }
+ int64_t total_compressed_bytes_written() const override {
+ return pager_->total_compressed_bytes_written();
+ }
+
const WriterProperties* properties() override { return properties_; }
bool pages_change_on_record_boundaries() const {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 243927449e..4dd4b10ccc 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -104,12 +104,15 @@ class PARQUET_EXPORT PageWriter {
// Return the number of uncompressed bytes written (including header size)
virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+ /// \brief The total number of bytes written as serialized data and
+ /// dictionary pages to the sink so far.
+ virtual int64_t total_compressed_bytes_written() const = 0;
+
virtual bool has_compressor() = 0;
virtual void Compress(const Buffer& src_buffer, ResizableBuffer*
dest_buffer) = 0;
};
-static constexpr int WRITE_BATCH_SIZE = 1000;
class PARQUET_EXPORT ColumnWriter {
public:
virtual ~ColumnWriter() = default;
@@ -131,14 +134,23 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The number of rows written so far
virtual int64_t rows_written() const = 0;
- /// \brief The total size of the compressed pages + page headers. Some values
- /// might be still buffered and not written to a page yet
+ /// \brief The total size of the compressed pages + page headers. Values
+ /// are still buffered and not written to a pager yet
+ ///
+ /// So in un-buffered mode, it always returns 0
virtual int64_t total_compressed_bytes() const = 0;
/// \brief The total number of bytes written as serialized data and
/// dictionary pages to the ColumnChunk so far
+ /// These bytes are uncompressed bytes.
virtual int64_t total_bytes_written() const = 0;
+ /// \brief The total number of bytes written as serialized data and
+ /// dictionary pages to the ColumnChunk so far.
+ /// If the column is uncompressed, the value would be equal to
+ /// total_bytes_written().
+ virtual int64_t total_compressed_bytes_written() const = 0;
+
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index f97108153b..aa05f4e791 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1318,5 +1318,169 @@ TEST(TestColumnWriter,
WriteDataPagesChangeOnRecordBoundariesWithSmallBatches) {
}
}
+class ColumnWriterTestSizeEstimated : public ::testing::Test {
+ public:
+ void SetUp() {
+ sink_ = CreateOutputStream();
+ node_ = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED,
+ {
+ schema::Int32("required", Repetition::REQUIRED),
+ }));
+ std::vector<schema::NodePtr> fields;
+ schema_descriptor_ = std::make_unique<SchemaDescriptor>();
+ schema_descriptor_->Init(node_);
+ }
+
+ std::shared_ptr<parquet::Int32Writer> BuildWriter(Compression::type
compression,
+ bool buffered,
+ bool enable_dictionary =
false) {
+ auto builder = WriterProperties::Builder();
+ builder.disable_dictionary()
+ ->compression(compression)
+ ->data_pagesize(100 * sizeof(int));
+ if (enable_dictionary) {
+ builder.enable_dictionary();
+ } else {
+ builder.disable_dictionary();
+ }
+ writer_properties_ = builder.build();
+ metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_,
+
schema_descriptor_->Column(0));
+
+ std::unique_ptr<PageWriter> pager = PageWriter::Open(
+ sink_, compression, Codec::UseDefaultCompressionLevel(),
metadata_.get(),
+ /* row_group_ordinal */ -1, /* column_chunk_ordinal*/ -1,
+ ::arrow::default_memory_pool(), /* buffered_row_group */ buffered,
+ /* header_encryptor */ NULLPTR, /* data_encryptor */ NULLPTR,
+ /* enable_checksum */ false);
+ return std::static_pointer_cast<parquet::Int32Writer>(
+ ColumnWriter::Make(metadata_.get(), std::move(pager),
writer_properties_.get()));
+ }
+
+ std::shared_ptr<::arrow::io::BufferOutputStream> sink_;
+ std::shared_ptr<GroupNode> node_;
+ std::unique_ptr<SchemaDescriptor> schema_descriptor_;
+
+ std::shared_ptr<WriterProperties> writer_properties_;
+ std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+};
+
+TEST_F(ColumnWriterTestSizeEstimated, NonBuffered) {
+ auto required_writer =
+ this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false);
+ // Write half page, page will not be flushed after loop
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page not flushed, check size
+ EXPECT_EQ(0, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes()); // unbuffered
+ EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+ // Write half page, page be flushed after loop
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page flushed, check size
+ EXPECT_LT(400, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+
+ // Test after closed
+ int64_t written_size = required_writer->Close();
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_EQ(written_size, required_writer->total_bytes_written());
+ // uncompressed writer should be equal
+ EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, Buffered) {
+ auto required_writer = this->BuildWriter(Compression::UNCOMPRESSED, /*
buffered*/ true);
+ // Write half page, page will not be flushed after loop
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page not flushed, check size
+ EXPECT_EQ(0, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes()); // buffered
+ EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+ // Write half page, page be flushed after loop
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page flushed, check size
+ EXPECT_LT(400, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_LT(400, required_writer->total_compressed_bytes_written());
+
+ // Test after closed
+ int64_t written_size = required_writer->Close();
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_EQ(written_size, required_writer->total_bytes_written());
+ // uncompressed writer should be equal
+ EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, NonBufferedDictionary) {
+ auto required_writer =
+ this->BuildWriter(Compression::UNCOMPRESSED, /* buffered*/ false, true);
+ // for dict, keep all values equal
+ int32_t dict_value = 1;
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+ }
+ // Page not flushed, check size
+ EXPECT_EQ(0, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+ // write a huge batch to trigger page flush
+ for (int32_t i = 0; i < 50000; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &dict_value);
+ }
+ // Page flushed, check size
+ EXPECT_EQ(0, required_writer->total_bytes_written());
+ EXPECT_LT(400, required_writer->total_compressed_bytes());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+
+ required_writer->Close();
+
+ // Test after closed
+ int64_t written_size = required_writer->Close();
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_EQ(written_size, required_writer->total_bytes_written());
+ // uncompressed writer should be equal
+ EXPECT_EQ(written_size, required_writer->total_compressed_bytes_written());
+}
+
+TEST_F(ColumnWriterTestSizeEstimated, BufferedCompression) {
+#ifndef ARROW_WITH_SNAPPY
+ GTEST_SKIP() << "Test requires snappy compression";
+#endif
+ auto required_writer = this->BuildWriter(Compression::SNAPPY, true);
+
+ // Write half page
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page not flushed, check size
+ EXPECT_EQ(0, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes()); // buffered
+ EXPECT_EQ(0, required_writer->total_compressed_bytes_written());
+ for (int32_t i = 0; i < 50; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+ // Page flushed, check size
+ EXPECT_LT(400, required_writer->total_bytes_written());
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_LT(required_writer->total_compressed_bytes_written(),
+ required_writer->total_bytes_written());
+
+ // Test after closed
+ int64_t written_size = required_writer->Close();
+ EXPECT_EQ(0, required_writer->total_compressed_bytes());
+ EXPECT_EQ(written_size, required_writer->total_bytes_written());
+ EXPECT_GT(written_size, required_writer->total_compressed_bytes_written());
+}
+
} // namespace test
} // namespace parquet
diff --git a/cpp/src/parquet/file_serialize_test.cc
b/cpp/src/parquet/file_serialize_test.cc
index eb1133d8a9..85bfd1c514 100644
--- a/cpp/src/parquet/file_serialize_test.cc
+++ b/cpp/src/parquet/file_serialize_test.cc
@@ -84,8 +84,13 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
// Ensure column() API which is specific to BufferedRowGroup cannot be
called
ASSERT_THROW(row_group_writer->column(col), ParquetException);
}
-
+ EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+ EXPECT_NE(0, row_group_writer->total_bytes_written());
+ EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
row_group_writer->Close();
+ EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+ EXPECT_NE(0, row_group_writer->total_bytes_written());
+ EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
}
// Write half BufferedRowGroups
for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
@@ -102,12 +107,19 @@ class TestSerialize : public PrimitiveTypedTest<TestType>
{
ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
}
}
+ // total_compressed_bytes() may equal to 0 if no dictionary enabled and
no buffered
+ // values.
+ EXPECT_EQ(0, row_group_writer->total_bytes_written());
+ EXPECT_EQ(0, row_group_writer->total_compressed_bytes_written());
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
column_writer->Close();
}
row_group_writer->Close();
+ EXPECT_EQ(0, row_group_writer->total_compressed_bytes());
+ EXPECT_NE(0, row_group_writer->total_bytes_written());
+ EXPECT_NE(0, row_group_writer->total_compressed_bytes_written());
}
file_writer->Close();
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index b30a2633c9..ec2e0e8a8a 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -62,6 +62,10 @@ int64_t RowGroupWriter::total_bytes_written() const {
return contents_->total_bytes_written();
}
+int64_t RowGroupWriter::total_compressed_bytes_written() const {
+ return contents_->total_compressed_bytes_written();
+}
+
bool RowGroupWriter::buffered() const { return contents_->buffered(); }
int RowGroupWriter::current_column() { return contents_->current_column(); }
@@ -90,6 +94,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
metadata_(metadata),
properties_(properties),
total_bytes_written_(0),
+ total_compressed_bytes_written_(0),
closed_(false),
row_group_ordinal_(row_group_ordinal),
next_column_index_(0),
@@ -126,6 +131,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
if (column_writers_[0]) {
total_bytes_written_ += column_writers_[0]->Close();
+ total_compressed_bytes_written_ +=
+ column_writers_[0]->total_compressed_bytes_written();
}
++next_column_index_;
@@ -171,6 +178,9 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
}
int64_t total_bytes_written() const override {
+ if (closed_) {
+ return total_bytes_written_;
+ }
int64_t total_bytes_written = 0;
for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
@@ -180,6 +190,20 @@ class RowGroupSerializer : public RowGroupWriter::Contents
{
return total_bytes_written;
}
+ int64_t total_compressed_bytes_written() const override {
+ if (closed_) {
+ return total_compressed_bytes_written_;
+ }
+ int64_t total_compressed_bytes_written = 0;
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_compressed_bytes_written +=
+ column_writers_[i]->total_compressed_bytes_written();
+ }
+ }
+ return total_compressed_bytes_written;
+ }
+
bool buffered() const override { return buffered_row_group_; }
void Close() override {
@@ -190,6 +214,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
total_bytes_written_ += column_writers_[i]->Close();
+ total_compressed_bytes_written_ +=
+ column_writers_[i]->total_compressed_bytes_written();
column_writers_[i].reset();
}
}
@@ -207,6 +233,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
mutable RowGroupMetaDataBuilder* metadata_;
const WriterProperties* properties_;
int64_t total_bytes_written_;
+ int64_t total_compressed_bytes_written_;
bool closed_;
int16_t row_group_ordinal_;
int next_column_index_;
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index e6d8608a75..f2888599fb 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -52,10 +52,12 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int current_column() const = 0;
virtual void Close() = 0;
- // total bytes written by the page writer
+ /// \brief total uncompressed bytes written by the page writer
virtual int64_t total_bytes_written() const = 0;
- // total bytes still compressed but not written
+ /// \brief total bytes still compressed but not written by the page writer
virtual int64_t total_compressed_bytes() const = 0;
+ /// \brief total compressed bytes written by the page writer
+ virtual int64_t total_compressed_bytes_written() const = 0;
virtual bool buffered() const = 0;
};
@@ -90,8 +92,13 @@ class PARQUET_EXPORT RowGroupWriter {
*/
int64_t num_rows() const;
+ /// \brief total uncompressed bytes written by the page writer
int64_t total_bytes_written() const;
+ /// \brief total bytes still compressed but not written by the page writer.
+ /// It will always return 0 from the SerializedPageWriter.
int64_t total_compressed_bytes() const;
+ /// \brief total compressed bytes written by the page writer
+ int64_t total_compressed_bytes_written() const;
/// Returns whether the current RowGroupWriter is in the buffered mode and
is created
/// by calling ParquetFileWriter::AppendBufferedRowGroup.
diff --git a/cpp/src/parquet/stream_writer.cc b/cpp/src/parquet/stream_writer.cc
index dc76c2935d..856436d701 100644
--- a/cpp/src/parquet/stream_writer.cc
+++ b/cpp/src/parquet/stream_writer.cc
@@ -294,7 +294,6 @@ void StreamWriter::EndRow() {
}
// Initialize for each row with size already written
// (compressed + uncompressed).
- //
row_group_size_ = row_group_writer_->total_bytes_written() +
row_group_writer_->total_compressed_bytes();
}