pitrou commented on code in PR #40594: URL: https://github.com/apache/arrow/pull/40594#discussion_r1856227930
########## cpp/src/parquet/size_statistics.h: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> Review Comment: Need additional includes for vector and unique_ptr ```suggestion #include <memory> #include <optional> #include <vector> ``` ########## cpp/src/parquet/size_statistics.h: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> + +#include "arrow/util/span.h" Review Comment: This one is actually not used? ########## cpp/src/parquet/size_statistics.cc: ########## @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliancec +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/size_statistics.h" + +#include <algorithm> + +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/visit_data_inline.h" +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + Review Comment: Not all these includes are being used, can we trim them to keep compilation times under control? ########## cpp/src/parquet/size_statistics.h: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> + +#include "arrow/util/span.h" +#include "parquet/platform.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +struct PARQUET_EXPORT SizeStatistics { + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_repetition_level + /// is 0 without loss of information. + std::vector<int64_t> repetition_level_histogram; + + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_definition_level + /// is 0 without loss of information. + std::vector<int64_t> definition_level_histogram; + + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + std::optional<int64_t> unencoded_byte_array_data_bytes; + + /// \brief Check if the SizeStatistics is set. + bool is_set() const { + return !repetition_level_histogram.empty() || !definition_level_histogram.empty() || + unencoded_byte_array_data_bytes.has_value(); + } + + /// \brief Increment the unencoded byte array data bytes. + void IncrementUnencodedByteArrayDataBytes(int64_t value); + + /// \brief Merge two SizeStatistics. + void Merge(const SizeStatistics& other); + + void Reset(); +}; + +PARQUET_EXPORT std::unique_ptr<SizeStatistics> MakeSizeStatistics( + const ColumnDescriptor* descr); Review Comment: Two API nits: 1. why not a factory static method `SizeStatistics::Make` 2. why return a `std::unique_ptr` rather than a plain value? Other parts of the code use a plain `SizeStatistics` value. ########## cpp/src/parquet/properties.h: ########## @@ -237,7 +244,8 @@ class PARQUET_EXPORT WriterProperties { data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), - page_checksum_enabled_(false) {} + page_checksum_enabled_(false), + size_statistics_level_(SizeStatisticsLevel::None) {} Review Comment: Can you perhaps run a file write benchmark to see if default to `ColumnChunk` or even `Page` would reduce performance? Ideally, writing those statistics should be mostly costless, and they can be useful on read side. ########## cpp/src/parquet/size_statistics.h: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> + +#include "arrow/util/span.h" +#include "parquet/platform.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +struct PARQUET_EXPORT SizeStatistics { + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_repetition_level + /// is 0 without loss of information. + std::vector<int64_t> repetition_level_histogram; + + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_definition_level + /// is 0 without loss of information. + std::vector<int64_t> definition_level_histogram; + + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + std::optional<int64_t> unencoded_byte_array_data_bytes; + + /// \brief Check if the SizeStatistics is set. + bool is_set() const { + return !repetition_level_histogram.empty() || !definition_level_histogram.empty() || + unencoded_byte_array_data_bytes.has_value(); + } + + /// \brief Increment the unencoded byte array data bytes. + void IncrementUnencodedByteArrayDataBytes(int64_t value); + + /// \brief Merge two SizeStatistics. + void Merge(const SizeStatistics& other); + Review Comment: Perhaps we can add a `void Validate(const ColumnDescriptor*) const` method that would throw if the histograms don't have the right length, or if `unencoded_byte_array_data_bytes` is present for a non-BYTE_ARRAY column? This would help on the read side. ########## cpp/src/parquet/properties.h: ########## @@ -47,6 +47,13 @@ namespace parquet { /// DataPageV2 at all. enum class ParquetDataPageVersion { V1, V2 }; +/// Controls the level of size statistics that are written to the file. +enum class SizeStatisticsLevel : uint8_t { + None = 0, // No size statistics are written. + ColumnChunk, // Only column chunk size statistics are written. + Page // Both size statistics in the column chunk and page index are written. Review Comment: `PageAndColumnChunk`, then, perhaps? ########## cpp/src/parquet/page_index.h: ########## @@ -299,15 +315,14 @@ class PARQUET_EXPORT OffsetIndexBuilder { virtual ~OffsetIndexBuilder() = default; - /// \brief Add page location of a data page. - virtual void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) = 0; + /// \brief Add page location and size stats of a data page. + virtual void AddPage( + int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional<int64_t> unencoded_byte_array_length = std::nullopt) = 0; - /// \brief Add page location of a data page. - void AddPage(const PageLocation& page_location) { - AddPage(page_location.offset, page_location.compressed_page_size, - page_location.first_row_index); - } + /// \brief Add page location and size stats of a data page. + void AddPage(const PageLocation& page_location, + const SizeStatistics* size_stats = NULLPTR); Review Comment: Why is the convention to take `const SizeStatistics&` above and `const SizeStatistics*` here? (I have no preference, but I also see no reason for being inconsistent) ########## cpp/src/parquet/properties.h: ########## @@ -47,6 +47,13 @@ namespace parquet { /// DataPageV2 at all. enum class ParquetDataPageVersion { V1, V2 }; +/// Controls the level of size statistics that are written to the file. +enum class SizeStatisticsLevel : uint8_t { Review Comment: Nit-pickingly, I would suggest `SizeStatisticsGranularity` except that it's too long :) So we can probably keep "level"... ########## cpp/src/parquet/page_index.h: ########## @@ -299,15 +315,14 @@ class PARQUET_EXPORT OffsetIndexBuilder { virtual ~OffsetIndexBuilder() = default; - /// \brief Add page location of a data page. - virtual void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) = 0; + /// \brief Add page location and size stats of a data page. + virtual void AddPage( + int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional<int64_t> unencoded_byte_array_length = std::nullopt) = 0; Review Comment: Slightly clearer perhaps: ```suggestion virtual void AddPage( int64_t offset, int32_t compressed_page_size, int64_t first_row_index, std::optional<int64_t> unencoded_byte_array_length = {}) = 0; ``` ########## cpp/src/parquet/encoding_test.cc: ########## @@ -197,6 +198,18 @@ class TestEncodingBase : public ::testing::Test { draws_[nvalues * j + i] = draws_[i]; } } + + // Calculate expected unencoded bytes based on type + if constexpr (std::is_same_v<Type, ByteArrayType>) { + unencoded_data_bytes_ = 0; + for (int i = 0; i < num_values_; i++) { + unencoded_data_bytes_ += draws_[i].len; + } + } else if constexpr (std::is_same_v<Type, FLBAType>) { + unencoded_data_bytes_ = num_values_ * type_length_; + } else { + unencoded_data_bytes_ = num_values_ * sizeof(c_type); + } Review Comment: Can we factor this out in a method that can also be called by `TestDeltaByteArrayEncoding`? ########## cpp/src/parquet/metadata.cc: ########## @@ -308,6 +309,14 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return is_stats_set() ? possible_stats_ : nullptr; } + inline std::unique_ptr<SizeStatistics> size_statistics() const { + if (!column_metadata_->__isset.size_statistics) { + return nullptr; + } + return std::make_unique<SizeStatistics>( + FromThrift(column_metadata_->size_statistics)); Review Comment: I think we should also sanity-check the SizeStatistics (as suggested in `SizeStatistics::Validate`). ########## cpp/src/parquet/page_index.cc: ########## @@ -636,6 +689,19 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_offset(page_location.offset + final_position); } } + + // Finalize unencoded_byte_array_data_bytes and make sure page sizes match. + if (offset_index_.page_locations.size() == + offset_index_.unencoded_byte_array_data_bytes.size()) { + offset_index_.__isset.unencoded_byte_array_data_bytes = true; + } else if (!offset_index_.unencoded_byte_array_data_bytes.empty()) { + std::stringstream ss; + ss << "Invalid count of unencoded BYTE_ARRAY data bytes: " + << offset_index_.unencoded_byte_array_data_bytes.size() + << ", expected page count: " << offset_index_.page_locations.size(); + throw ParquetException(ss.str()); Review Comment: Why do we throw an exception here but use `ARROW_CHECK` above when checking a similar condition for histograms? We should probably choose a consistent idiom (both are ok with me, but a dedicated exception message might be more useful for the user). ########## cpp/src/parquet/size_statistics.h: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <optional> + +#include "arrow/util/span.h" +#include "parquet/platform.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +struct PARQUET_EXPORT SizeStatistics { + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_repetition_level + /// is 0 without loss of information. + std::vector<int64_t> repetition_level_histogram; + + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_definition_level + /// is 0 without loss of information. + std::vector<int64_t> definition_level_histogram; Review Comment: It's gonna be confusing if some APIs put def levels first and others rep levels first. Given that current Parquet C++ APIs put def levels before rep levels, let's do that here also? (the Thrift ordering is irrelevant IMHO, it's just an implementation detail for the user) ########## cpp/src/parquet/page_index_test.cc: ########## @@ -651,6 +724,28 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) { EXPECT_EQ(0, buffer->size()); } +TEST(PageIndex, WriteInt64ColumnIndexWithSizeStats) { + auto encode = [=](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); + }; + + // Integer values in the descending order. + std::vector<EncodedStatistics> page_stats(3); + page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2)); + page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3)); + page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4)); + + // Page level histograms. + std::vector<PageLevelHistogram> page_levels; + page_levels.push_back(PageLevelHistogram{{1, 2, 3}, {2, 4, 6, 8}}); Review Comment: Interestingly these histograms are not valid, since we would expect `sum(rep_levels_histogram) == sum(def_levels_histogram)` :) It probably also doesn't matter for this test, but someone reading this code might be puzzled. ########## cpp/src/parquet/column_writer.cc: ########## @@ -1063,13 +1072,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, Review Comment: Also `std::move(page_stats)`? ########## cpp/src/parquet/page_index_test.cc: ########## @@ -446,23 +451,76 @@ TEST(PageIndex, WriteOffsetIndex) { /// Verify the data of the offset index. for (const auto& offset_index : offset_indexes) { ASSERT_EQ(num_pages, offset_index->page_locations().size()); + if (write_size_stats) { + ASSERT_EQ(num_pages, offset_index->unencoded_byte_array_data_bytes().size()); + } else { + ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty()); + } for (size_t i = 0; i < num_pages; ++i) { const auto& page_location = offset_index->page_locations().at(i); ASSERT_EQ(offsets[i] + final_position, page_location.offset); ASSERT_EQ(page_sizes[i], page_location.compressed_page_size); ASSERT_EQ(first_row_indices[i], page_location.first_row_index); + if (write_size_stats) { + ASSERT_EQ(unencoded_byte_array_lengths[i], + offset_index->unencoded_byte_array_data_bytes()[i]); + } } } } +TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/false); +} + +TEST(PageIndex, WriteOffsetIndexWithSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/true); +} + +struct PageLevelHistogram { + std::vector<int16_t> rep_levels; + std::vector<int16_t> def_levels; Review Comment: Should we swap those fields for consistency? (see comment on `size_statistics.h`) ########## cpp/src/parquet/column_writer.cc: ########## @@ -1063,13 +1072,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, - first_row_index); + first_row_index, std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { DataPageV2 page(combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, - pager_->has_compressor(), page_stats, first_row_index); + pager_->has_compressor(), page_stats, first_row_index, Review Comment: Same here. ########## cpp/src/parquet/size_statistics_test.cc: ########## @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <algorithm> +#include <random> + +#include "arrow/buffer.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/span.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/schema.h" +#include "parquet/size_statistics.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +TEST(SizeStatistics, ThriftSerDe) { + const std::vector<int64_t> kDefLevels = {128, 64, 32, 16}; + const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; + + for (const auto& descr : + {std::make_unique<ColumnDescriptor>(schema::Int32("a"), /*max_def_level=*/3, + /*max_rep_level=*/4), + std::make_unique<ColumnDescriptor>(schema::ByteArray("a"), /*max_def_level=*/3, + /*max_rep_level=*/4)}) { + auto size_statistics = MakeSizeStatistics(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); + } + auto thrift_statistics = ToThrift(*size_statistics); + auto restored_statistics = FromThrift(thrift_statistics); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); + if (descr->physical_type() == Type::BYTE_ARRAY) { + EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), + kUnencodedByteArrayDataBytes); + } else { + EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + } + } +} + +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} + +struct PageSizeStatistics { + std::vector<int64_t> ref_levels; + std::vector<int64_t> def_levels; + std::vector<int64_t> byte_array_bytes; + bool operator==(const PageSizeStatistics& other) const { + return ref_levels == other.ref_levels && def_levels == other.def_levels && + byte_array_bytes == other.byte_array_bytes; + } +}; + +class SizeStatisticsRoundTripTest : public ::testing::Test { + public: + void WriteFile(SizeStatisticsLevel level, + const std::shared_ptr<::arrow::Table>& table) { + auto writer_properties = WriterProperties::Builder() + .max_row_group_length(2) /* every row group has 2 rows */ + ->data_pagesize(1) /* every page has 1 row */ + ->enable_write_page_index() + ->enable_statistics() + ->set_size_statistics_level(level) + ->build(); + + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = + std::static_pointer_cast<schema::GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<arrow::FileWriter> arrow_writer; + ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema, + arrow_writer_properties, &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + void ReadSizeStatistics() { + auto read_properties = default_arrow_reader_properties(); + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); + + // Read row group size statistics in order. + auto metadata = reader->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_metadata = metadata->RowGroup(i); + for (int j = 0; j < metadata->num_columns(); ++j) { + auto column_metadata = row_group_metadata->ColumnChunk(j); + auto size_stats = column_metadata->size_statistics(); + SizeStatistics row_group_stats; + if (size_stats != nullptr) { + row_group_stats = {size_stats->repetition_level_histogram, + size_stats->definition_level_histogram, + size_stats->unencoded_byte_array_data_bytes}; + } + row_group_stats_.emplace_back(std::move(row_group_stats)); + } + } + + // Read page size statistics in order. + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_index_reader = page_index_reader->RowGroup(i); + ASSERT_NE(row_group_index_reader, nullptr); + + for (int j = 0; j < metadata->num_columns(); ++j) { + PageSizeStatistics page_stats; + + auto column_index = row_group_index_reader->GetColumnIndex(j); + if (column_index != nullptr) { + if (column_index->has_repetition_level_histograms()) { + page_stats.ref_levels = column_index->repetition_level_histograms(); + } + if (column_index->has_definition_level_histograms()) { + page_stats.def_levels = column_index->definition_level_histograms(); + } + } + + auto offset_index = row_group_index_reader->GetOffsetIndex(j); + if (offset_index != nullptr) { + page_stats.byte_array_bytes = offset_index->unencoded_byte_array_data_bytes(); + } + + page_stats_.emplace_back(std::move(page_stats)); + } + } + } + + protected: + std::shared_ptr<Buffer> buffer_; + std::vector<SizeStatistics> row_group_stats_; + std::vector<PageSizeStatistics> page_stats_; + inline static const SizeStatistics kEmptyRowGroupStats{}; + inline static const PageSizeStatistics kEmptyPageStats{}; +}; + +TEST_F(SizeStatisticsRoundTripTest, DisableSizeStats) { + auto schema = ::arrow::schema({ + ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))), + ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))), + }); + WriteFile(SizeStatisticsLevel::None, ::arrow::TableFromJSON(schema, {R"([ + [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ], + [ [[0,1,null]], [["foo","bar",null]] ], Review Comment: So these 2 batches make one row group, and the 2 other batches below another row group, right? Can you add comments so that this is more immediately understood? ########## cpp/src/parquet/encoder.cc: ########## @@ -181,7 +191,8 @@ inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_value } template <typename ArrayType> -void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) { +[[nodiscard]] int64_t DirectPutImpl(const ::arrow::Array& values, Review Comment: I think we used to have one before we mandated C++17. Now we just use `[[nodiscard]]`. ########## cpp/src/parquet/size_statistics_test.cc: ########## @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <algorithm> +#include <random> + +#include "arrow/buffer.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/span.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/schema.h" +#include "parquet/size_statistics.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +TEST(SizeStatistics, ThriftSerDe) { + const std::vector<int64_t> kDefLevels = {128, 64, 32, 16}; + const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; + + for (const auto& descr : + {std::make_unique<ColumnDescriptor>(schema::Int32("a"), /*max_def_level=*/3, + /*max_rep_level=*/4), + std::make_unique<ColumnDescriptor>(schema::ByteArray("a"), /*max_def_level=*/3, + /*max_rep_level=*/4)}) { + auto size_statistics = MakeSizeStatistics(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); + } + auto thrift_statistics = ToThrift(*size_statistics); + auto restored_statistics = FromThrift(thrift_statistics); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); + if (descr->physical_type() == Type::BYTE_ARRAY) { + EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), + kUnencodedByteArrayDataBytes); + } else { + EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + } + } +} + +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} + +struct PageSizeStatistics { + std::vector<int64_t> ref_levels; + std::vector<int64_t> def_levels; + std::vector<int64_t> byte_array_bytes; + bool operator==(const PageSizeStatistics& other) const { + return ref_levels == other.ref_levels && def_levels == other.def_levels && + byte_array_bytes == other.byte_array_bytes; + } +}; + +class SizeStatisticsRoundTripTest : public ::testing::Test { + public: + void WriteFile(SizeStatisticsLevel level, + const std::shared_ptr<::arrow::Table>& table) { + auto writer_properties = WriterProperties::Builder() + .max_row_group_length(2) /* every row group has 2 rows */ + ->data_pagesize(1) /* every page has 1 row */ + ->enable_write_page_index() + ->enable_statistics() + ->set_size_statistics_level(level) + ->build(); + + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = + std::static_pointer_cast<schema::GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<arrow::FileWriter> arrow_writer; + ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema, + arrow_writer_properties, &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + void ReadSizeStatistics() { + auto read_properties = default_arrow_reader_properties(); + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); + + // Read row group size statistics in order. + auto metadata = reader->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_metadata = metadata->RowGroup(i); + for (int j = 0; j < metadata->num_columns(); ++j) { + auto column_metadata = row_group_metadata->ColumnChunk(j); + auto size_stats = column_metadata->size_statistics(); + SizeStatistics row_group_stats; + if (size_stats != nullptr) { + row_group_stats = {size_stats->repetition_level_histogram, + size_stats->definition_level_histogram, + size_stats->unencoded_byte_array_data_bytes}; + } + row_group_stats_.emplace_back(std::move(row_group_stats)); + } + } + + // Read page size statistics in order. + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_index_reader = page_index_reader->RowGroup(i); + ASSERT_NE(row_group_index_reader, nullptr); + + for (int j = 0; j < metadata->num_columns(); ++j) { + PageSizeStatistics page_stats; + + auto column_index = row_group_index_reader->GetColumnIndex(j); + if (column_index != nullptr) { + if (column_index->has_repetition_level_histograms()) { + page_stats.ref_levels = column_index->repetition_level_histograms(); + } + if (column_index->has_definition_level_histograms()) { + page_stats.def_levels = column_index->definition_level_histograms(); + } + } + + auto offset_index = row_group_index_reader->GetOffsetIndex(j); + if (offset_index != nullptr) { + page_stats.byte_array_bytes = offset_index->unencoded_byte_array_data_bytes(); + } + + page_stats_.emplace_back(std::move(page_stats)); + } + } + } + + protected: + std::shared_ptr<Buffer> buffer_; + std::vector<SizeStatistics> row_group_stats_; + std::vector<PageSizeStatistics> page_stats_; + inline static const SizeStatistics kEmptyRowGroupStats{}; + inline static const PageSizeStatistics kEmptyPageStats{}; +}; + +TEST_F(SizeStatisticsRoundTripTest, DisableSizeStats) { + auto schema = ::arrow::schema({ + ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))), + ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))), + }); + WriteFile(SizeStatisticsLevel::None, ::arrow::TableFromJSON(schema, {R"([ + [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ], + [ [[0,1,null]], [["foo","bar",null]] ], + [ [], [] ], + [ [[],[null],null], [[],[null],null] ] + ])"})); + + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(kEmptyRowGroupStats, kEmptyRowGroupStats, + kEmptyRowGroupStats, kEmptyRowGroupStats)); + EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats, kEmptyPageStats, + kEmptyPageStats, kEmptyPageStats)); +} + +TEST_F(SizeStatisticsRoundTripTest, EnableColumnChunkSizeStats) { + auto schema = ::arrow::schema({ + ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))), + ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))), + }); + WriteFile(SizeStatisticsLevel::ColumnChunk, ::arrow::TableFromJSON(schema, {R"([ + [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ], + [ [[0,1,null]], [["foo","bar",null]] ], + [ [], [] ], + [ [[],[null],null], [[],[null],null] ] + ])"})); + + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*ref_levels=*/{2, 2, 5}, + /*def_levels=*/{0, 0, 0, 0, 1, 8}, + /*byte_array_bytes=*/std::nullopt}, + SizeStatistics{/*ref_levels=*/{2, 2, 5}, + /*def_levels=*/{0, 0, 0, 0, 1, 8}, + /*byte_array_bytes=*/12}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, + /*def_levels=*/{0, 1, 1, 1, 1, 0}, + /*byte_array_bytes=*/std::nullopt}, + SizeStatistics{/*ref_levels=*/{2, 2, 0}, + /*def_levels=*/{0, 1, 1, 1, 1, 0}, + /*byte_array_bytes=*/0})); + EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats, kEmptyPageStats, + kEmptyPageStats, kEmptyPageStats)); +} + +TEST_F(SizeStatisticsRoundTripTest, EnablePageSizeStats) { Review Comment: So the three tests here use the exact same data and scaffolding with some minor variations, how about factoring things out? (perhaps even as a single test) ########## cpp/src/parquet/page_index.h: ########## @@ -76,6 +74,18 @@ class PARQUET_EXPORT ColumnIndex { /// \brief A vector of page indices for non-null pages. virtual const std::vector<int32_t>& non_null_page_indices() const = 0; + + /// \brief Whether repetition level histogram is available. + virtual bool has_repetition_level_histograms() const = 0; + + /// \brief Whether definition level histogram is available. + virtual bool has_definition_level_histograms() const = 0; + + /// \brief List of repetition level histograms for each page concatenated together. + virtual const std::vector<int64_t>& repetition_level_histograms() const = 0; Review Comment: This is the kind of place where we could instead return `util::span<const int64_t>`, though I'm not sure it's useful here (we're unlikely to use something else than `std::vector` internally for this). ########## cpp/src/parquet/page_index_benchmark.cc: ########## @@ -82,7 +82,7 @@ void BM_ReadColumnIndex(::benchmark::State& state) { GenerateBenchmarkData(values_per_page, /*seed=*/0, values.data(), &heap, kDataStringLength); stats->Update(values.data(), values_per_page, /*null_count=*/0); - builder->AddPage(stats->Encode()); + builder->AddPage(stats->Encode(), {}); Review Comment: ```suggestion builder->AddPage(stats->Encode(), /*size_statistics=*/{}); ``` ########## cpp/src/parquet/page_index_test.cc: ########## @@ -446,23 +451,76 @@ TEST(PageIndex, WriteOffsetIndex) { /// Verify the data of the offset index. for (const auto& offset_index : offset_indexes) { ASSERT_EQ(num_pages, offset_index->page_locations().size()); + if (write_size_stats) { + ASSERT_EQ(num_pages, offset_index->unencoded_byte_array_data_bytes().size()); + } else { + ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty()); + } for (size_t i = 0; i < num_pages; ++i) { const auto& page_location = offset_index->page_locations().at(i); ASSERT_EQ(offsets[i] + final_position, page_location.offset); ASSERT_EQ(page_sizes[i], page_location.compressed_page_size); ASSERT_EQ(first_row_indices[i], page_location.first_row_index); + if (write_size_stats) { + ASSERT_EQ(unencoded_byte_array_lengths[i], + offset_index->unencoded_byte_array_data_bytes()[i]); + } } } } +TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/false); +} + +TEST(PageIndex, WriteOffsetIndexWithSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/true); +} + +struct PageLevelHistogram { + std::vector<int16_t> rep_levels; + std::vector<int16_t> def_levels; +}; + +std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics( + const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) { + auto stats = MakeSizeStatistics(descr); + for (int16_t level = 0; level <= descr->max_repetition_level(); ++level) { + stats->repetition_level_histogram[level] = page_level_histogram.rep_levels[level]; + } + for (int16_t level = 0; level <= descr->max_definition_level(); ++level) { + stats->definition_level_histogram[level] = page_level_histogram.def_levels[level]; + } + return stats; +} + +void VerifyPageLevelHistogram(int16_t max_level, size_t page_id, + const std::vector<int16_t>& expected_page_levels, Review Comment: Isn't `max_level` just `expected_page_levels.size() - 1`? Is there any reason to pass it explicitly? ########## cpp/src/parquet/page_index_test.cc: ########## @@ -446,23 +451,76 @@ TEST(PageIndex, WriteOffsetIndex) { /// Verify the data of the offset index. for (const auto& offset_index : offset_indexes) { ASSERT_EQ(num_pages, offset_index->page_locations().size()); + if (write_size_stats) { + ASSERT_EQ(num_pages, offset_index->unencoded_byte_array_data_bytes().size()); + } else { + ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty()); + } for (size_t i = 0; i < num_pages; ++i) { const auto& page_location = offset_index->page_locations().at(i); ASSERT_EQ(offsets[i] + final_position, page_location.offset); ASSERT_EQ(page_sizes[i], page_location.compressed_page_size); ASSERT_EQ(first_row_indices[i], page_location.first_row_index); + if (write_size_stats) { + ASSERT_EQ(unencoded_byte_array_lengths[i], + offset_index->unencoded_byte_array_data_bytes()[i]); + } } } } +TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/false); +} + +TEST(PageIndex, WriteOffsetIndexWithSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/true); +} + +struct PageLevelHistogram { + std::vector<int16_t> rep_levels; + std::vector<int16_t> def_levels; +}; + +std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics( + const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) { + auto stats = MakeSizeStatistics(descr); + for (int16_t level = 0; level <= descr->max_repetition_level(); ++level) { + stats->repetition_level_histogram[level] = page_level_histogram.rep_levels[level]; + } + for (int16_t level = 0; level <= descr->max_definition_level(); ++level) { + stats->definition_level_histogram[level] = page_level_histogram.def_levels[level]; + } Review Comment: Should this just be: ```suggestion stats->definition_level_histogram = page_level_histogram.def_levels; stats->repetition_level_histogram = page_level_histogram.rep_levels; ``` ########## cpp/src/parquet/metadata.h: ########## @@ -156,6 +142,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::shared_ptr<schema::ColumnPath> path_in_schema() const; bool is_stats_set() const; std::shared_ptr<Statistics> statistics() const; + std::unique_ptr<SizeStatistics> size_statistics() const; Review Comment: I find it weird that each call to `size_statistics()` creates a new `SizeStatistics` instance. This forces the caller to cache it on their own if they don't want to pay the cost of Thrift deserialization every time. Why not simply deserialize it when creating the `ColumnChunkMetaData`? Then you can return either a `const SizeStatistics&` or a `std::shared_ptr<SizeStatistics>`. ########## cpp/src/parquet/size_statistics_test.cc: ########## @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <algorithm> +#include <random> + +#include "arrow/buffer.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/span.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/schema.h" +#include "parquet/size_statistics.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +TEST(SizeStatistics, ThriftSerDe) { + const std::vector<int64_t> kDefLevels = {128, 64, 32, 16}; + const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; + + for (const auto& descr : + {std::make_unique<ColumnDescriptor>(schema::Int32("a"), /*max_def_level=*/3, + /*max_rep_level=*/4), + std::make_unique<ColumnDescriptor>(schema::ByteArray("a"), /*max_def_level=*/3, + /*max_rep_level=*/4)}) { + auto size_statistics = MakeSizeStatistics(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); + } + auto thrift_statistics = ToThrift(*size_statistics); + auto restored_statistics = FromThrift(thrift_statistics); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); + if (descr->physical_type() == Type::BYTE_ARRAY) { + EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), + kUnencodedByteArrayDataBytes); + } else { + EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + } + } +} + +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} + +struct PageSizeStatistics { + std::vector<int64_t> ref_levels; Review Comment: Typo: `rep_levels` ########## cpp/src/parquet/size_statistics_test.cc: ########## @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <algorithm> +#include <random> + +#include "arrow/buffer.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/span.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/schema.h" +#include "parquet/size_statistics.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +TEST(SizeStatistics, ThriftSerDe) { + const std::vector<int64_t> kDefLevels = {128, 64, 32, 16}; + const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; + + for (const auto& descr : + {std::make_unique<ColumnDescriptor>(schema::Int32("a"), /*max_def_level=*/3, + /*max_rep_level=*/4), + std::make_unique<ColumnDescriptor>(schema::ByteArray("a"), /*max_def_level=*/3, + /*max_rep_level=*/4)}) { + auto size_statistics = MakeSizeStatistics(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); + } + auto thrift_statistics = ToThrift(*size_statistics); + auto restored_statistics = FromThrift(thrift_statistics); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); + if (descr->physical_type() == Type::BYTE_ARRAY) { + EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), + kUnencodedByteArrayDataBytes); + } else { + EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + } + } +} + +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} + +struct PageSizeStatistics { + std::vector<int64_t> ref_levels; + std::vector<int64_t> def_levels; + std::vector<int64_t> byte_array_bytes; + bool operator==(const PageSizeStatistics& other) const { + return ref_levels == other.ref_levels && def_levels == other.def_levels && + byte_array_bytes == other.byte_array_bytes; + } +}; + +class SizeStatisticsRoundTripTest : public ::testing::Test { + public: + void WriteFile(SizeStatisticsLevel level, + const std::shared_ptr<::arrow::Table>& table) { + auto writer_properties = WriterProperties::Builder() + .max_row_group_length(2) /* every row group has 2 rows */ + ->data_pagesize(1) /* every page has 1 row */ + ->enable_write_page_index() + ->enable_statistics() + ->set_size_statistics_level(level) + ->build(); + + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = + std::static_pointer_cast<schema::GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<arrow::FileWriter> arrow_writer; + ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema, + arrow_writer_properties, &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + void ReadSizeStatistics() { + auto read_properties = default_arrow_reader_properties(); + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); + + // Read row group size statistics in order. + auto metadata = reader->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_metadata = metadata->RowGroup(i); + for (int j = 0; j < metadata->num_columns(); ++j) { + auto column_metadata = row_group_metadata->ColumnChunk(j); + auto size_stats = column_metadata->size_statistics(); + SizeStatistics row_group_stats; + if (size_stats != nullptr) { + row_group_stats = {size_stats->repetition_level_histogram, + size_stats->definition_level_histogram, + size_stats->unencoded_byte_array_data_bytes}; + } + row_group_stats_.emplace_back(std::move(row_group_stats)); Review Comment: Or more simply perhaps ```suggestion row_group_stats_.push_back(size_stats ? *size_stats : SizeStatistics{}); ``` ########## cpp/src/arrow/util/hashing.h: ########## @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable { } } + // Visit the stored value at a specific index in insertion order. + // The visitor function should have the signature `void(std::string_view)` Review Comment: I suppose it's costless to do so anyway. ########## cpp/src/parquet/encoding.h: ########## @@ -158,6 +158,10 @@ class Encoder { virtual Encoding::type encoding() const = 0; virtual void Put(const ::arrow::Array& values) = 0; + // Report the number of bytes before encoding that have been written + // to the encoder since the last report. Note that this call is not + // idempotent because it resets the internal counter. + virtual int64_t ReportUnencodedDataBytes() = 0; Review Comment: Should it be supported for anything else than `BYTE_ARRAY`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org