This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c764c3ef5 GH-34326: [C++][Parquet] Page null_count is incorrect if
stats is disabled (#34327)
7c764c3ef5 is described below
commit 7c764c3ef55db034239dd9244874168587232edd
Author: Gang Wu <[email protected]>
AuthorDate: Sat Mar 4 01:54:00 2023 +0800
GH-34326: [C++][Parquet] Page null_count is incorrect if stats is disabled
(#34327)
### Rationale for this change
Parquet ColumnWriter obtains null_count of a page from page stats as below
([link](https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.cc#L952))
```cpp
EncodedStatistics page_stats = GetPageStatistics();
int32_t null_count = static_cast<int32_t>(page_stats.null_count);
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length,
uncompressed_size,
pager_->has_compressor(), page_stats);
```
However, the null_count is uninitialized if page stat is not enabled:
```cpp
EncodedStatistics GetPageStatistics() override {
EncodedStatistics result;
if (page_statistics_) result = page_statistics_->Encode();
return result;
}
```
### What changes are included in this PR?
ColumnWriter collects null_count by itself. To be safe, it also checks that
from page stats if available.
### Are these changes tested?
Added a test case to cover null counts of optional and repeated fields are
properly set.
### Are there any user-facing changes?
No.
* Closes: #34326
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/parquet/column_writer.cc | 42 +++++++++++++--------
cpp/src/parquet/column_writer_test.cc | 69 +++++++++++++++++++++++++++++++++++
2 files changed, 96 insertions(+), 15 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 829beb5325..f8072662a1 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -672,6 +672,7 @@ class ColumnWriterImpl {
allocator_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
+ num_buffered_nulls_(0),
num_buffered_rows_(0),
rows_written_(0),
total_bytes_written_(0),
@@ -777,6 +778,9 @@ class ColumnWriterImpl {
// values, this number may be lower than num_buffered_values_.
int64_t num_buffered_encoded_values_;
+ // The total number of nulls stored in the data page.
+ int64_t num_buffered_nulls_;
+
// Total number of rows buffered in the data page.
int64_t num_buffered_rows_;
@@ -890,6 +894,7 @@ void ColumnWriterImpl::AddDataPage() {
num_buffered_values_ = 0;
num_buffered_encoded_values_ = 0;
num_buffered_rows_ = 0;
+ num_buffered_nulls_ = 0;
}
void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
@@ -964,11 +969,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
ResetPageStatistics();
int32_t num_values = static_cast<int32_t>(num_buffered_values_);
- int32_t null_count = static_cast<int32_t>(page_stats.null_count);
+ int32_t null_count = static_cast<int32_t>(num_buffered_nulls_);
int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
int32_t def_levels_byte_length =
static_cast<int32_t>(definition_levels_rle_size);
int32_t rep_levels_byte_length =
static_cast<int32_t>(repetition_levels_rle_size);
+ // page_stats.null_count is not set when page_statistics_ is nullptr. It is
only used
+ // here for safety check.
+ DCHECK(!page_stats.has_null_count || page_stats.null_count == null_count);
+
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary
encoding
@@ -1154,9 +1163,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
if (values_to_write > 0) {
DCHECK_NE(nullptr, values);
}
- WriteValues(AddIfNotNull(values, value_offset), values_to_write,
- batch_size - values_to_write);
- CommitWriteAndCheckPageLimit(batch_size, values_to_write, check_page);
+ const int64_t num_nulls = batch_size - values_to_write;
+ WriteValues(AddIfNotNull(values, value_offset), values_to_write,
num_nulls);
+ CommitWriteAndCheckPageLimit(batch_size, values_to_write, num_nulls,
check_page);
value_offset += values_to_write;
// Dictionary size checked separately from data page size since we
@@ -1186,13 +1195,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
if (bits_buffer_ != nullptr) {
WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
batch_num_spaced_values, bits_buffer_->data(),
/*offset=*/0,
- /*num_levels=*/batch_size);
+ /*num_levels=*/batch_size, null_count);
} else {
WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values,
batch_num_spaced_values, valid_bits,
- valid_bits_offset + value_offset,
/*num_levels=*/batch_size);
+ valid_bits_offset + value_offset,
/*num_levels=*/batch_size,
+ null_count);
}
- CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values,
check_page);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values,
null_count,
+ check_page);
value_offset += batch_num_spaced_values;
// Dictionary size checked separately from data page size since we
@@ -1381,7 +1392,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
*out_spaced_values_to_write +=
def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0;
}
- *null_count = *out_values_to_write - *out_spaced_values_to_write;
+ *null_count = batch_size - *out_values_to_write;
}
return;
}
@@ -1448,9 +1459,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
}
void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
- bool check_page_size) {
+ int64_t num_nulls, bool check_page_size) {
num_buffered_values_ += num_levels;
num_buffered_encoded_values_ += num_values;
+ num_buffered_nulls_ += num_nulls;
if (check_page_size &&
current_encoder_->EstimatedDataEncodedSize() >=
properties_->data_pagesize()) {
@@ -1501,7 +1513,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
void WriteValuesSpaced(const T* values, int64_t num_values, int64_t
num_spaced_values,
const uint8_t* valid_bits, int64_t valid_bits_offset,
- int64_t num_levels) {
+ int64_t num_levels, int64_t num_nulls) {
if (num_values != num_spaced_values) {
current_value_encoder_->PutSpaced(values,
static_cast<int>(num_spaced_values),
valid_bits, valid_bits_offset);
@@ -1509,7 +1521,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
current_value_encoder_->Put(values, static_cast<int>(num_values));
}
if (page_statistics_ != nullptr) {
- const int64_t num_nulls = num_levels - num_values;
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
@@ -1613,7 +1624,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
- CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count,
check_page);
value_offset += batch_num_spaced_values;
};
@@ -2086,14 +2097,15 @@ Status
TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
data_slice, MaybeReplaceValidity(data_slice, null_count,
ctx->memory_pool));
current_encoder_->Put(*data_slice);
+ // Null values in ancestors count as nulls.
+ const int64_t non_null = data_slice->length() - data_slice->null_count();
if (page_statistics_ != nullptr) {
page_statistics_->Update(*data_slice, /*update_counts=*/false);
- // Null values in ancestors count as nulls.
- int64_t non_null = data_slice->length() - data_slice->null_count();
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
- CommitWriteAndCheckPageLimit(batch_size, batch_num_values, check_page);
+ CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size -
non_null,
+ check_page);
CheckDictionarySizeLimit();
value_offset += batch_num_spaced_values;
};
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index aa05f4e791..65c1ccbdfd 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1482,5 +1482,74 @@ TEST_F(ColumnWriterTestSizeEstimated,
BufferedCompression) {
EXPECT_GT(written_size, required_writer->total_compressed_bytes_written());
}
+TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
+ auto sink = CreateOutputStream();
+ auto list_type = GroupNode::Make("list", Repetition::REPEATED,
+ {schema::Int32("elem",
Repetition::OPTIONAL)});
+ auto schema = std::static_pointer_cast<GroupNode>(GroupNode::Make(
+ "schema", Repetition::REQUIRED,
+ {
+ schema::Int32("non_null", Repetition::OPTIONAL),
+ schema::Int32("half_null", Repetition::OPTIONAL),
+ schema::Int32("all_null", Repetition::OPTIONAL),
+ GroupNode::Make("half_null_list", Repetition::OPTIONAL, {list_type}),
+ GroupNode::Make("half_empty_list", Repetition::OPTIONAL,
{list_type}),
+ GroupNode::Make("half_list_of_null", Repetition::OPTIONAL,
{list_type}),
+ GroupNode::Make("all_single_list", Repetition::OPTIONAL,
{list_type}),
+ }));
+ auto properties = WriterProperties::Builder()
+ /* Use V2 data page to read null_count from header */
+ .data_page_version(ParquetDataPageVersion::V2)
+ /* Disable stats to test null_count is properly set */
+ ->disable_statistics()
+ ->disable_dictionary()
+ ->build();
+ auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+ auto rg_writer = file_writer->AppendRowGroup();
+
+ constexpr int32_t num_rows = 10;
+ constexpr int32_t num_cols = 7;
+ const std::vector<std::vector<int16_t>> def_levels_by_col = {
+ {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, {1, 0, 1, 0, 1, 0, 1, 0, 1, 0},
+ {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, {0, 3, 0, 3, 0, 3, 0, 3, 0, 3},
+ {1, 3, 1, 3, 1, 3, 1, 3, 1, 3}, {2, 3, 2, 3, 2, 3, 2, 3, 2, 3},
+ {3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
+ };
+ const std::vector<int16_t> ref_levels(num_rows, 0);
+ const std::vector<int32_t> values(num_rows, 123);
+ const std::vector<int64_t> expect_null_count_by_col = {0, 5, 10, 5, 5, 5, 0};
+
+ for (int32_t i = 0; i < num_cols; ++i) {
+ auto writer = static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ writer->WriteBatch(num_rows, def_levels_by_col[i].data(),
+ i >= 3 ? ref_levels.data() : nullptr, values.data());
+ }
+
+ ASSERT_NO_THROW(file_writer->Close());
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ auto file_reader = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ auto metadata = file_reader->metadata();
+ ASSERT_EQ(1, metadata->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+
+ std::shared_ptr<Page> page;
+ for (int32_t i = 0; i < num_cols; ++i) {
+ auto page_reader = row_group_reader->GetColumnPageReader(i);
+ int64_t num_nulls_read = 0;
+ int64_t num_rows_read = 0;
+ int64_t num_values_read = 0;
+ while ((page = page_reader->NextPage()) != nullptr) {
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ num_nulls_read += data_page->num_nulls();
+ num_rows_read += data_page->num_rows();
+ num_values_read += data_page->num_values();
+ }
+ EXPECT_EQ(expect_null_count_by_col[i], num_nulls_read);
+ EXPECT_EQ(num_rows, num_rows_read);
+ EXPECT_EQ(num_rows, num_values_read);
+ }
+}
+
} // namespace test
} // namespace parquet