This is an automated email from the ASF dual-hosted git repository. brycemecum pushed a commit to branch maint-19.0.1 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit f1f860d99497de370ea2439a41b0bfff6b588ea5 Author: Gang Wu <[email protected]> AuthorDate: Tue Jan 21 17:28:43 2025 +0800 GH-45283: [C++][Parquet] Omit level histogram when max level is 0 (#45285) The level histogram of size statistics can be omitted if its max level is 0. We haven't implemented this yet and enforces histogram size to be equal to `max_level + 1`. However, when reading a Parquet file with omitted level histogram, exception will be thrown. Omit level histogram when max level is 0. Yes, a test case has been added to reflect the change. No. * GitHub Issue: #45283 Lead-authored-by: Gang Wu <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Gang Wu <[email protected]> --- cpp/src/parquet/column_writer.cc | 9 +-- cpp/src/parquet/size_statistics.cc | 40 +++++++----- cpp/src/parquet/size_statistics_test.cc | 105 +++++++++++++++++++++++++++++--- 3 files changed, 126 insertions(+), 28 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 12cbcf20af..5525c91316 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1606,11 +1606,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } auto add_levels = [](std::vector<int64_t>& level_histogram, - ::arrow::util::span<const int16_t> levels) { - for (int16_t level : levels) { - ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size())); - ++level_histogram[level]; + ::arrow::util::span<const int16_t> levels, int16_t max_level) { + if (max_level == 0) { + return; } + ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size()); + ::parquet::UpdateLevelHistogram(levels, level_histogram); }; if (descr_->max_definition_level() > 0) { diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc index a02cef7aba..7efc2777b5 100644 --- a/cpp/src/parquet/size_statistics.cc +++ b/cpp/src/parquet/size_statistics.cc @@ -54,23 +54,28 @@ void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) { } void SizeStatistics::Validate(const ColumnDescriptor* descr) const { - if (repetition_level_histogram.size() != - static_cast<size_t>(descr->max_repetition_level() + 1)) { - throw ParquetException("Repetition level histogram size mismatch"); - } - if (definition_level_histogram.size() != - static_cast<size_t>(descr->max_definition_level() + 1)) { - throw ParquetException("Definition level histogram size mismatch"); - } + auto validate_histogram = [](const std::vector<int64_t>& histogram, int16_t max_level, + const std::string& name) { + if (histogram.empty()) { + // A levels histogram is always allowed to be missing. + return; + } + if (histogram.size() != static_cast<size_t>(max_level + 1)) { + std::stringstream ss; + ss << name << " level histogram size mismatch, size: " << histogram.size() + << ", expected: " << (max_level + 1); + throw ParquetException(ss.str()); + } + }; + validate_histogram(repetition_level_histogram, descr->max_repetition_level(), + "Repetition"); + validate_histogram(definition_level_histogram, descr->max_definition_level(), + "Definition"); if (unencoded_byte_array_data_bytes.has_value() && descr->physical_type() != Type::BYTE_ARRAY) { throw ParquetException("Unencoded byte array data bytes does not support " + TypeToString(descr->physical_type())); } - if (!unencoded_byte_array_data_bytes.has_value() && - descr->physical_type() == Type::BYTE_ARRAY) { - throw ParquetException("Missing unencoded byte array data bytes"); - } } void SizeStatistics::Reset() { @@ -83,8 +88,15 @@ void SizeStatistics::Reset() { std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* descr) { auto size_stats = std::make_unique<SizeStatistics>(); - size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0); - size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0); + // If the max level is 0, the level histogram can be omitted because it contains + // only single level (a.k.a. 0) and its count is equivalent to `num_values` of the + // column chunk or data page. + if (descr->max_repetition_level() != 0) { + size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0); + } + if (descr->max_definition_level() != 0) { + size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0); + } if (descr->physical_type() == Type::BYTE_ARRAY) { size_stats->unencoded_byte_array_data_bytes = 0; } diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc index cefd31dce2..ff79e5c35a 100644 --- a/cpp/src/parquet/size_statistics_test.cc +++ b/cpp/src/parquet/size_statistics_test.cc @@ -168,12 +168,22 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { } } - void Reset() { - buffer_.reset(); - row_group_stats_.clear(); - page_stats_.clear(); + void ReadData() { + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); + auto metadata = reader->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + int64_t num_rows = metadata->RowGroup(i)->num_rows(); + auto row_group_reader = reader->RowGroup(i); + for (int j = 0; j < metadata->num_columns(); ++j) { + auto column_reader = row_group_reader->RecordReader(j); + ASSERT_EQ(column_reader->ReadRecords(num_rows + 1), num_rows); + } + } } + void Reset() { buffer_.reset(); } + protected: std::shared_ptr<Buffer> buffer_; std::vector<SizeStatistics> row_group_stats_; @@ -256,24 +266,99 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) { ReadSizeStatistics(); EXPECT_THAT(row_group_stats_, ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/5}, SizeStatistics{/*def_levels=*/{1, 1}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/1}, SizeStatistics{/*def_levels=*/{0, 2}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/4})); EXPECT_THAT(page_stats_, ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/{5}}, PageSizeStatistics{/*def_levels=*/{1, 1}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/{1}}, PageSizeStatistics{/*def_levels=*/{0, 2}, - /*rep_levels=*/{2}, + /*rep_levels=*/{}, /*byte_array_bytes=*/{4}})); } +TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) { + // Rep/def level histograms are updates in batches of `write_batch_size` levels + // inside a single page. Exercise the logic with more than one batch per page. + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::list(::arrow::utf8()))}); + auto table = ::arrow::TableFromJSON(schema, {R"([ + [ [null,"a","ab"] ], + [ null ], + [ [] ], + [ [null,"d","de"] ], + [ ["g","gh",null] ], + [ ["j","jk",null] ] + ])"}); + for (int write_batch_size : {100, 5, 4, 3, 2, 1}) { + ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size); + WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table, + /*max_row_group_length=*/1000, /*page_size=*/1000, write_batch_size); + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 8}, + /*rep_levels=*/{6, 8}, + /*byte_array_bytes=*/12})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 1, 4, 8}, + /*rep_levels=*/{6, 8}, + /*byte_array_bytes=*/{12}})); + } +} + +TEST_F(SizeStatisticsRoundTripTest, LargePage) { + // When max_level is 1, the levels are summed in 2**30 chunks, exercise this + // by testing with a 90000 rows table; + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())}); + auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([ + [ "a" ], + [ "bc" ], + [ null ] + ])"); + ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches( + ::arrow::RecordBatchVector(30000, seed_batch))); + ASSERT_OK_AND_ASSIGN(table, table->CombineChunks()); + ASSERT_EQ(table->num_rows(), 90000); + + WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table, + /*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30, + /*write_batch_size=*/50000); + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 60000}, + /*rep_levels=*/{}, + /*byte_array_bytes=*/90000})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 60000}, + /*rep_levels=*/{}, + /*byte_array_bytes=*/{90000}})); +} + +TEST_F(SizeStatisticsRoundTripTest, MaxLevelZero) { + auto schema = + ::arrow::schema({::arrow::field("a", ::arrow::utf8(), /*nullable=*/false)}); + WriteFile(SizeStatisticsLevel::PageAndColumnChunk, + ::arrow::TableFromJSON(schema, {R"([["foo"],["bar"]])"}), + /*max_row_group_length=*/2, + /*page_size=*/1024); + ASSERT_NO_FATAL_FAILURE(ReadSizeStatistics()); + ASSERT_NO_FATAL_FAILURE(ReadData()); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{}, + /*rep_levels=*/{}, + /*byte_array_bytes=*/6})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{}, + /*rep_levels=*/{}, + /*byte_array_bytes=*/{6}})); +} + } // namespace parquet
