This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 05c8b222c4 GH-34086: [C++][Parquet] Fix writing num_rows to data page
v2 (#34096)
05c8b222c4 is described below
commit 05c8b222c4d409d538cc5f92c802d4d683407ad5
Author: Gang Wu <[email protected]>
AuthorDate: Sat Feb 11 01:41:15 2023 +0800
GH-34086: [C++][Parquet] Fix writing num_rows to data page v2 (#34096)
### Rationale for this change
The C++ parquet writer does not correctly fill num_rows field to DataPageV2
header.
### What changes are included in this PR?
ColumnWriter keeps track of number of rows buffered in the current data
page and then fills it into header of data page v2.
### Are these changes tested?
A test case has been added to make sure the data page header has been set
correctly for required, optional and repeated columns.
### Are there any user-facing changes?
No.
* Closes: #34086
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/column_writer.cc | 35 ++++++++++-----
cpp/src/parquet/column_writer_test.cc | 85 +++++++++++++++++++++++++++++++++++
2 files changed, 108 insertions(+), 12 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index f549331faf..26d25ba17f 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -657,6 +657,7 @@ class ColumnWriterImpl {
allocator_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
+ num_buffered_rows_(0),
rows_written_(0),
total_bytes_written_(0),
total_compressed_bytes_(0),
@@ -757,10 +758,13 @@ class ColumnWriterImpl {
// case.
int64_t num_buffered_values_;
- // The total number of stored values. For repeated or optional values, this
- // number may be lower than num_buffered_values_.
+ // The total number of stored values in the data page. For repeated or
optional
+ // values, this number may be lower than num_buffered_values_.
int64_t num_buffered_encoded_values_;
+ // Total number of rows buffered in the data page.
+ int64_t num_buffered_rows_;
+
// Total number of rows written with this ColumnWriter
int64_t rows_written_;
@@ -869,6 +873,7 @@ void ColumnWriterImpl::AddDataPage() {
InitSinks();
num_buffered_values_ = 0;
num_buffered_encoded_values_ = 0;
+ num_buffered_rows_ = 0;
}
void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
@@ -894,6 +899,8 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
compressed_data = uncompressed_data_;
}
+ int32_t num_values = static_cast<int32_t>(num_buffered_values_);
+
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary
encoding
@@ -901,15 +908,14 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
auto compressed_data_copy,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
- compressed_data_copy, static_cast<int32_t>(num_buffered_values_),
encoding_,
- Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+ compressed_data_copy, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
+ uncompressed_size, page_stats);
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
- DataPageV1 page(compressed_data,
static_cast<int32_t>(num_buffered_values_),
- encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
- page_stats);
+ DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
+ uncompressed_size, page_stats);
WriteDataPage(page);
}
}
@@ -943,6 +949,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
int32_t num_values = static_cast<int32_t>(num_buffered_values_);
int32_t null_count = static_cast<int32_t>(page_stats.null_count);
+ int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
int32_t def_levels_byte_length =
static_cast<int32_t>(definition_levels_rle_size);
int32_t rep_levels_byte_length =
static_cast<int32_t>(repetition_levels_rle_size);
@@ -952,12 +959,12 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
PARQUET_ASSIGN_OR_THROW(auto data_copy,
combined->CopySlice(0, combined->size(),
allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
- combined, num_values, null_count, num_values, encoding_,
def_levels_byte_length,
+ combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
page_stats);
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
- DataPageV2 page(combined, num_values, null_count, num_values, encoding_,
+ DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length,
uncompressed_size,
pager_->has_compressor(), page_stats);
WriteDataPage(page);
@@ -1263,6 +1270,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
for (int64_t i = 0; i < num_values; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
+ num_buffered_rows_++;
}
}
@@ -1270,6 +1278,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
} else {
// Each value is exactly one row
rows_written_ += num_values;
+ num_buffered_rows_ += num_values;
}
return values_to_write;
}
@@ -1353,12 +1362,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
+ num_buffered_rows_++;
}
}
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_levels;
+ num_buffered_rows_ += num_levels;
}
}
@@ -1477,9 +1488,9 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = ::arrow::kUnknownNullCount;
- // Bits is not null for nullable values. At this point in the code we
can't determine
- // if the leaf array has the same null values as any parents it might have
had so we
- // need to recompute it from def levels.
+ // Bits is not null for nullable values. At this point in the code we
can't
+ // determine if the leaf array has the same null values as any parents it
might have
+ // had so we need to recompute it from def levels.
MaybeCalculateValidityBits(AddIfNotNull(def_levels, offset), batch_size,
&batch_num_values, &batch_num_spaced_values,
&null_count);
WriteLevelsSpaced(batch_size, AddIfNotNull(def_levels, offset),
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index 78d627c382..20a6799c57 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -27,6 +27,7 @@
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
+#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
#include "parquet/metadata.h"
#include "parquet/platform.h"
@@ -1058,5 +1059,89 @@ TEST(TestLevelEncoder, MinimumBufferSize2) {
}
}
+TEST(TestColumnWriter, WriteDataPageV2Header) {
+ auto sink = CreateOutputStream();
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED,
+ {
+ schema::Int32("required", Repetition::REQUIRED),
+ schema::Int32("optional", Repetition::OPTIONAL),
+ schema::Int32("repeated", Repetition::REPEATED),
+ }));
+ auto properties = WriterProperties::Builder()
+ .disable_dictionary()
+ ->data_page_version(ParquetDataPageVersion::V2)
+ ->build();
+ auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+ auto rg_writer = file_writer->AppendRowGroup();
+
+ constexpr int32_t num_rows = 100;
+
+ auto required_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ for (int32_t i = 0; i < num_rows; i++) {
+ required_writer->WriteBatch(1, nullptr, nullptr, &i);
+ }
+
+ // Write a null value at every other row.
+ auto optional_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ for (int32_t i = 0; i < num_rows; i++) {
+ int16_t definition_level = i % 2 == 0 ? 1 : 0;
+ optional_writer->WriteBatch(1, &definition_level, nullptr, &i);
+ }
+
+ // Each row has repeated twice.
+ auto repeated_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ for (int i = 0; i < 2 * num_rows; i++) {
+ int32_t value = i * 1000;
+ int16_t definition_level = 1;
+ int16_t repetition_level = i % 2 == 0 ? 1 : 0;
+ repeated_writer->WriteBatch(1, &definition_level, &repetition_level,
&value);
+ }
+
+ ASSERT_NO_THROW(file_writer->Close());
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ auto file_reader = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ auto metadata = file_reader->metadata();
+ ASSERT_EQ(1, metadata->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+
+ // Verify required column.
+ {
+ auto page_reader = row_group_reader->GetColumnPageReader(0);
+ auto page = page_reader->NextPage();
+ ASSERT_NE(page, nullptr);
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ EXPECT_EQ(num_rows, data_page->num_rows());
+ EXPECT_EQ(num_rows, data_page->num_values());
+ EXPECT_EQ(0, data_page->num_nulls());
+ EXPECT_EQ(page_reader->NextPage(), nullptr);
+ }
+
+ // Verify optional column.
+ {
+ auto page_reader = row_group_reader->GetColumnPageReader(1);
+ auto page = page_reader->NextPage();
+ ASSERT_NE(page, nullptr);
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ EXPECT_EQ(num_rows, data_page->num_rows());
+ EXPECT_EQ(num_rows, data_page->num_values());
+ EXPECT_EQ(num_rows / 2, data_page->num_nulls());
+ EXPECT_EQ(page_reader->NextPage(), nullptr);
+ }
+
+ // Verify repeated column.
+ {
+ auto page_reader = row_group_reader->GetColumnPageReader(2);
+ auto page = page_reader->NextPage();
+ ASSERT_NE(page, nullptr);
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ EXPECT_EQ(num_rows, data_page->num_rows());
+ EXPECT_EQ(num_rows * 2, data_page->num_values());
+ EXPECT_EQ(0, data_page->num_nulls());
+ EXPECT_EQ(page_reader->NextPage(), nullptr);
+ }
+}
+
} // namespace test
} // namespace parquet