pitrou commented on code in PR #37400: URL: https://github.com/apache/arrow/pull/37400#discussion_r2686170586
########## cpp/src/parquet/bloom_filter_writer.cc: ########## @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/bloom_filter_writer.h" + +#include <map> +#include <utility> + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/exception.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" +#include "parquet/schema.h" +#include "parquet/types.h" + +namespace parquet { + +constexpr int64_t kHashBatchSize = 256; + +template <typename ParquetType> +TypedBloomFilterWriter<ParquetType>::TypedBloomFilterWriter(const ColumnDescriptor* descr, + BloomFilter* bloom_filter) + : descr_(descr), bloom_filter_(bloom_filter) {} + +template <typename ParquetType> +void TypedBloomFilterWriter<ParquetType>::Update(const T* values, int64_t num_values) { + if constexpr (std::is_same_v<ParquetType, BooleanType>) { + throw ParquetException("Bloom filter is not supported for boolean type"); + } Review Comment: You're specializing `Update` for `BooleanType` below, is this snippet still necessary? ########## cpp/src/parquet/file_writer.cc: ########## @@ -456,10 +472,24 @@ class FileSerializer : public ParquetFileWriter::Contents { if (page_index_builder_ != nullptr) { // Serialize page index after all row groups have been written and report // location to the file metadata. - PageIndexLocation page_index_location; page_index_builder_->Finish(); - page_index_builder_->WriteTo(sink_.get(), &page_index_location); - metadata_->SetPageIndexLocation(page_index_location); + auto write_result = page_index_builder_->WriteTo(sink_.get()); + metadata_->SetIndexLocations(IndexKind::kColumnIndex, + write_result.column_index_locations); + metadata_->SetIndexLocations(IndexKind::kOffsetIndex, + write_result.offset_index_locations); + } + } + + void WriteBloomFilter() { + if (bloom_filter_builder_ != nullptr) { + if (properties_->file_encryption_properties()) { + ParquetException::NYI("Encryption is not supported with bloom filter"); Review Comment: ```suggestion ParquetException::NYI("Encryption is currently not supported with bloom filter"); ``` ########## cpp/src/parquet/properties.h: ########## @@ -169,6 +169,25 @@ static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = true; static constexpr SizeStatisticsLevel DEFAULT_SIZE_STATISTICS_LEVEL = SizeStatisticsLevel::PageAndColumnChunk; +struct PARQUET_EXPORT BloomFilterOptions { + // Expected number of distinct values (NDV) in the bloom filter. + // + // Bloom filters are most effective for high-cardinality columns. A good default + // is to set ndv equal to the number of rows. Lower values reduce disk usage but + // may not be worthwhile for very small NDVs. + // + // Increasing ndv (without increasing fpp) increases disk and memory usage. Review Comment: Let's use docstring syntax (i.e. `///` instead of `//`)? Also, can we give a rough guideline of disk space per bloom filter depending on ndv and fpp? ########## cpp/src/parquet/column_writer.cc: ########## @@ -2660,32 +2689,38 @@ std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* met encoding = properties->dictionary_index_encoding(); } switch (descr->physical_type()) { - case Type::BOOLEAN: + case Type::BOOLEAN: { + if (bloom_filter != nullptr) { + throw ParquetException("Bloom filter is not supported for boolean type"); + } return std::make_shared<TypedColumnWriterImpl<BooleanType>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, + /*bloom_filter=*/nullptr); + } case Type::INT32: return std::make_shared<TypedColumnWriterImpl<Int32Type>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::INT64: return std::make_shared<TypedColumnWriterImpl<Int64Type>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::INT96: return std::make_shared<TypedColumnWriterImpl<Int96Type>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::FLOAT: return std::make_shared<TypedColumnWriterImpl<FloatType>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::DOUBLE: return std::make_shared<TypedColumnWriterImpl<DoubleType>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::BYTE_ARRAY: return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared<TypedColumnWriterImpl<FLBAType>>( - metadata, std::move(pager), use_dictionary, encoding, properties); + metadata, std::move(pager), use_dictionary, encoding, properties, bloom_filter); default: - ParquetException::NYI("type reader not implemented"); + ParquetException::NYI("Column writer not implemented for type: " + + TypeToString(descr->physical_type())); Review Comment: +1 :-) ########## cpp/src/parquet/arrow/index_test.cc: ########## @@ -0,0 +1,680 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <cstdint> +#include <set> +#include <vector> + +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/properties.h" + +using arrow::Array; +using arrow::Buffer; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::Result; +using arrow::Status; +using arrow::Table; +using arrow::internal::checked_cast; +using arrow::io::BufferReader; + +using ParquetType = parquet::Type; + +using parquet::arrow::FromParquetSchema; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet::arrow { + +namespace { + +struct ColumnIndexObject { + std::vector<bool> null_pages; + std::vector<std::string> min_values; + std::vector<std::string> max_values; + BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + std::vector<int64_t> null_counts; + + ColumnIndexObject() = default; + + ColumnIndexObject(const std::vector<bool>& null_pages, + const std::vector<std::string>& min_values, + const std::vector<std::string>& max_values, + BoundaryOrder::type boundary_order, + const std::vector<int64_t>& null_counts) + : null_pages(null_pages), + min_values(min_values), + max_values(max_values), + boundary_order(boundary_order), + null_counts(null_counts) {} + + explicit ColumnIndexObject(const ColumnIndex* column_index) { + if (column_index == nullptr) { + return; + } + null_pages = column_index->null_pages(); + min_values = column_index->encoded_min_values(); + max_values = column_index->encoded_max_values(); + boundary_order = column_index->boundary_order(); + if (column_index->has_null_counts()) { + null_counts = column_index->null_counts(); + } + } + + bool operator==(const ColumnIndexObject& b) const { + return null_pages == b.null_pages && min_values == b.min_values && + max_values == b.max_values && boundary_order == b.boundary_order && + null_counts == b.null_counts; + } +}; + +auto encode_int64 = [](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); +}; + +auto encode_double = [](double value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(double)); +}; + +} // namespace + +class TestingWithPageIndex { + public: + void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties, + const std::shared_ptr<::arrow::Table>& table) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<FileWriter> arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + protected: + std::shared_ptr<Buffer> buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, + const std::set<int>& expect_columns_without_index = {}) { + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + int64_t offset_lower_bound = 0; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + auto row_group_reader = reader->RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + auto column_index = row_group_index_reader->GetColumnIndex(col); + column_indexes_.emplace_back(column_index.get()); + + bool expect_no_page_index = + expect_columns_without_index.find(col) != expect_columns_without_index.cend(); + + auto offset_index = row_group_index_reader->GetOffsetIndex(col); + if (expect_no_page_index) { + ASSERT_EQ(offset_index, nullptr); + } else { + CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound); + } + + // Verify page stats are not written to page header if page index is enabled. + auto page_reader = row_group_reader->GetColumnPageReader(col); + ASSERT_NE(page_reader, nullptr); + std::shared_ptr<Page> page = nullptr; + while ((page = page_reader->NextPage()) != nullptr) { + if (page->type() == PageType::DATA_PAGE || + page->type() == PageType::DATA_PAGE_V2) { + ASSERT_EQ(std::static_pointer_cast<DataPage>(page)->statistics().is_set(), + expect_no_page_index); + } + } + } + } + } + + private: + void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages, + int64_t* offset_lower_bound_in_out) { + ASSERT_NE(offset_index, nullptr); + const auto& locations = offset_index->page_locations(); + ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size()); + int64_t prev_first_row_index = -1; + for (const auto& location : locations) { + // Make sure first_row_index is in the ascending order within a row group. + ASSERT_GT(location.first_row_index, prev_first_row_index); + // Make sure page offset is in the ascending order across the file. + ASSERT_GE(location.offset, *offset_lower_bound_in_out); + // Make sure page size is positive. + ASSERT_GT(location.compressed_page_size, 0); + prev_first_row_index = location.first_row_index; + *offset_lower_bound_in_out = location.offset + location.compressed_page_size; + } + } + + protected: + std::vector<ColumnIndexObject> column_indexes_; +}; + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)}, + /*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics() + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); + for (auto& column_index : column_indexes_) { + // Means page index is empty. + EXPECT_EQ(ColumnIndexObject{}, column_index); + } +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics("c0") + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + ColumnIndexObject empty_column_index{}; + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) /* write single-row row group */ + ->max_statistics_size(20) /* drop stats larger than it */ + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + ["short_string"], + ["very_large_string_to_drop_stats"] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"}, + /*max_values=*/{"short_string"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{})); +} + +TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->data_pagesize(1) /* write multiple pages */ + ->build(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + WriteFile( + writer_properties, + ::arrow::TableFromJSON( + schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{ + /*null_pages=*/{false, false, false, true}, + /*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""}, + /*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""}, + BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}}, + ColumnIndexObject{/*null_pages=*/{false, false, false, true}, + /*min_values=*/{"a", "c", "f", ""}, + /*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(3) /* 3 rows per row group */ + ->build(); + + // Create table to write with NaNs. + auto vectors = std::vector<std::shared_ptr<Array>>(4); + // NaN will be ignored in min/max stats. + ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]); + // Lower bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]); + // Upper bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]); + // Pages with all NaNs will not build column index. + ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + arrow::ChunkedArray::Make(vectors, ::arrow::float64())); + + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); + auto table = Table::Make(schema, {chunked_array}); + WriteFile(writer_properties, table); + + ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(0.1)}, + /*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{ + /* Page with only NaN values does not have column index built */})); +} + +TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::int64()), + ::arrow::field("c2", ::arrow::int64())}); + auto writer_properties = + WriterProperties::Builder() + .enable_write_page_index() /* enable by default */ + ->enable_write_page_index("c0") /* enable c0 explicitly */ + ->disable_write_page_index("c1") /* disable c1 explicitly */ + ->build(); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, + /*expect_columns_without_index=*/{1}); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)}, + /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/* page index of c1 is disabled */}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}})); +} + +class ParquetBloomFilterRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadBloomFilters(int expect_num_row_groups, + const std::set<int>& expect_columns_without_bloom_filter = {}) { + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + auto metadata = reader->metadata(); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_reader = bloom_filter_reader.RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + if (expect_columns_without_bloom_filter.find(col) != + expect_columns_without_bloom_filter.cend()) { + ASSERT_EQ(row_group_reader->GetColumnBloomFilter(col), nullptr); + } else { + bloom_filters_.push_back(row_group_reader->GetColumnBloomFilter(col)); + ASSERT_NE(bloom_filters_.back(), nullptr); + } + } + } + } + + template <typename ArrowType> + void VerifyBloomFilterContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterNotContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterAcrossRowGroups(const std::shared_ptr<ChunkedArray>& column, + int num_row_groups, + const std::vector<int64_t>& row_group_row_counts, + int num_bf_columns, int col_idx) { + int64_t current_row_offset = 0; + for (int rg = 0; rg < num_row_groups; ++rg) { + int bf_idx = rg * num_bf_columns + col_idx; + auto col_slice = column->Slice(current_row_offset, row_group_row_counts[rg]); + + // Verify this bloom filter contains values from this row group + VerifyBloomFilterContains<ArrowType>(*bloom_filters_[bf_idx], *col_slice); + + // Verify other row groups' bloom filters don't contain these values + for (int other_rg = 0; other_rg < num_row_groups; ++other_rg) { + if (other_rg != rg) { + int other_bf_idx = other_rg * num_bf_columns + col_idx; + VerifyBloomFilterNotContains<ArrowType>(*bloom_filters_[other_bf_idx], + *col_slice); + } + } + current_row_offset += row_group_row_counts[rg]; + } + } + + protected: + // Bloom filters for each column in each row group. + std::vector<std::unique_ptr<BloomFilter>> bloom_filters_; +}; + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); Review Comment: Do we want to also test with `large_utf8` and `utf8_view` as below? ########## cpp/src/parquet/bloom_filter_writer.h: ########## @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/type_fwd.h" + +#include "parquet/bloom_filter.h" +#include "parquet/index_location.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +/// \brief Writer for updating a bloom filter with values of a specific Parquet type. Review Comment: Perhaps mention that `BooleanType` is not supported? ########## cpp/src/parquet/bloom_filter_writer.cc: ########## @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/bloom_filter_writer.h" + +#include <map> +#include <utility> + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/exception.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" +#include "parquet/schema.h" +#include "parquet/types.h" + +namespace parquet { + +constexpr int64_t kHashBatchSize = 256; + +template <typename ParquetType> +TypedBloomFilterWriter<ParquetType>::TypedBloomFilterWriter(const ColumnDescriptor* descr, + BloomFilter* bloom_filter) + : descr_(descr), bloom_filter_(bloom_filter) {} + +template <typename ParquetType> +void TypedBloomFilterWriter<ParquetType>::Update(const T* values, int64_t num_values) { + if constexpr (std::is_same_v<ParquetType, BooleanType>) { + throw ParquetException("Bloom filter is not supported for boolean type"); + } + + ARROW_DCHECK(bloom_filter_ != nullptr); + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - i)); + if constexpr (std::is_same_v<ParquetType, FLBAType>) { + bloom_filter_->Hashes(values + i, descr_->type_length(), batch_size, hashes.data()); + } else { + bloom_filter_->Hashes(values + i, batch_size, hashes.data()); + } + bloom_filter_->InsertHashes(hashes.data(), batch_size); + } +} + +template <> +void TypedBloomFilterWriter<BooleanType>::Update(const bool*, int64_t) { + throw ParquetException("Bloom filter is not supported for boolean type"); +} + +template <typename ParquetType> +void TypedBloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, + int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + ARROW_DCHECK(bloom_filter_ != nullptr); + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - i)); + if constexpr (std::is_same_v<ParquetType, FLBAType>) { + bloom_filter_->Hashes(values + i + position, descr_->type_length(), + batch_size, hashes.data()); + } else { + bloom_filter_->Hashes(values + i + position, batch_size, hashes.data()); + } + bloom_filter_->InsertHashes(hashes.data(), batch_size); + } + }); +} + +template <> +void TypedBloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, + const uint8_t*, int64_t) { + throw ParquetException("Bloom filter is not supported for boolean type"); +} + +template <typename ParquetType> +void TypedBloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) { + ParquetException::NYI("Updating bloom filter is not implemented for array of type: " + + values.type()->ToString()); +} + +namespace { + +template <typename ArrayType> +void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& array) { + std::array<ByteArray, kHashBatchSize> byte_arrays; + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + array.null_bitmap_data(), array.offset(), array.length(), + [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - i)); + for (int j = 0; j < batch_size; j++) { + byte_arrays[j] = array.GetView(position + i + j); + } + bloom_filter.Hashes(byte_arrays.data(), batch_size, hashes.data()); + bloom_filter.InsertHashes(hashes.data(), batch_size); + } + }); +} + +} // namespace + +template <> +void TypedBloomFilterWriter<ByteArrayType>::Update(const ::arrow::Array& values) { + ARROW_DCHECK(bloom_filter_ != nullptr); + if (::arrow::is_binary_view_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryViewArray&>(values)); + } else if (::arrow::is_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryArray&>(values)); + } else if (::arrow::is_large_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::LargeBinaryArray&>(values)); + } else { + ParquetException::NYI("Bloom filter is not supported for this Arrow type: " + + values.type()->ToString()); + } +} + +template class TypedBloomFilterWriter<BooleanType>; +template class TypedBloomFilterWriter<Int32Type>; +template class TypedBloomFilterWriter<Int64Type>; +template class TypedBloomFilterWriter<Int96Type>; +template class TypedBloomFilterWriter<FloatType>; +template class TypedBloomFilterWriter<DoubleType>; +template class TypedBloomFilterWriter<ByteArrayType>; +template class TypedBloomFilterWriter<FLBAType>; + +namespace { + +/// \brief A concrete implementation of BloomFilterBuilder. +/// +/// \note Column encryption for bloom filter is not implemented yet. +class BloomFilterBuilderImpl : public BloomFilterBuilder { + public: + BloomFilterBuilderImpl(const SchemaDescriptor* schema, + const WriterProperties* properties) + : schema_(schema), properties_(properties) {} + + void AppendRowGroup() override; + + BloomFilter* CreateBloomFilter(int32_t column_ordinal) override; + + IndexLocations WriteTo(::arrow::io::OutputStream* sink) override; + + private: + /// Make sure column ordinal is not out of bound and the builder is in good state. + void CheckState(int32_t column_ordinal) const { + if (finished_) { + throw ParquetException("BloomFilterBuilder is already finished."); + } + if (bloom_filters_.empty()) { + throw ParquetException("No row group appended to BloomFilterBuilder"); + } + if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) { + throw ParquetException("Invalid column ordinal: " + std::to_string(column_ordinal)); + } + if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) { + throw ParquetException("BloomFilterBuilder does not support boolean type."); + } + } + + const SchemaDescriptor* schema_; + const WriterProperties* properties_; + bool finished_ = false; + + using RowGroupBloomFilters = std::map<int32_t, std::shared_ptr<BloomFilter>>; + std::map<int32_t, RowGroupBloomFilters> bloom_filters_; Review Comment: Why not a vector of pairs? I understand we want an ordered container, which is why we can't use `unordered_map`, but we don't access this container by key AFAICT. ########## cpp/src/parquet/type_fwd.h: ########## @@ -62,8 +62,6 @@ struct ParquetVersion { }; }; -struct PageIndexLocation; Review Comment: Can we want to also fwd-declare the new index location types? It can be useful at some point. ########## cpp/src/parquet/properties.h: ########## @@ -215,6 +234,15 @@ class PARQUET_EXPORT ColumnProperties { page_index_enabled_ = page_index_enabled; } + void set_bloom_filter_options(const BloomFilterOptions& bloom_filter_options) { + if (bloom_filter_options.fpp >= 1.0 || bloom_filter_options.fpp <= 0.0) { + throw ParquetException( + "Bloom filter false positive probability must be in (0.0, 1.0), got " + Review Comment: Isn't it `[0.0, 1.0]`? I'm not sure about the US convention for noting intervals, sorry. ########## cpp/src/parquet/arrow/index_test.cc: ########## @@ -0,0 +1,680 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <cstdint> +#include <set> +#include <vector> + +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/properties.h" + +using arrow::Array; +using arrow::Buffer; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::Result; +using arrow::Status; +using arrow::Table; +using arrow::internal::checked_cast; +using arrow::io::BufferReader; + +using ParquetType = parquet::Type; + +using parquet::arrow::FromParquetSchema; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet::arrow { + +namespace { + +struct ColumnIndexObject { + std::vector<bool> null_pages; + std::vector<std::string> min_values; + std::vector<std::string> max_values; + BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + std::vector<int64_t> null_counts; + + ColumnIndexObject() = default; + + ColumnIndexObject(const std::vector<bool>& null_pages, + const std::vector<std::string>& min_values, + const std::vector<std::string>& max_values, + BoundaryOrder::type boundary_order, + const std::vector<int64_t>& null_counts) + : null_pages(null_pages), + min_values(min_values), + max_values(max_values), + boundary_order(boundary_order), + null_counts(null_counts) {} + + explicit ColumnIndexObject(const ColumnIndex* column_index) { + if (column_index == nullptr) { + return; + } + null_pages = column_index->null_pages(); + min_values = column_index->encoded_min_values(); + max_values = column_index->encoded_max_values(); + boundary_order = column_index->boundary_order(); + if (column_index->has_null_counts()) { + null_counts = column_index->null_counts(); + } + } + + bool operator==(const ColumnIndexObject& b) const { + return null_pages == b.null_pages && min_values == b.min_values && + max_values == b.max_values && boundary_order == b.boundary_order && + null_counts == b.null_counts; + } +}; + +auto encode_int64 = [](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); +}; + +auto encode_double = [](double value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(double)); +}; + +} // namespace + +class TestingWithPageIndex { + public: + void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties, + const std::shared_ptr<::arrow::Table>& table) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<FileWriter> arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + protected: + std::shared_ptr<Buffer> buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, + const std::set<int>& expect_columns_without_index = {}) { + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + int64_t offset_lower_bound = 0; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + auto row_group_reader = reader->RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + auto column_index = row_group_index_reader->GetColumnIndex(col); + column_indexes_.emplace_back(column_index.get()); + + bool expect_no_page_index = + expect_columns_without_index.find(col) != expect_columns_without_index.cend(); + + auto offset_index = row_group_index_reader->GetOffsetIndex(col); + if (expect_no_page_index) { + ASSERT_EQ(offset_index, nullptr); + } else { + CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound); + } + + // Verify page stats are not written to page header if page index is enabled. + auto page_reader = row_group_reader->GetColumnPageReader(col); + ASSERT_NE(page_reader, nullptr); + std::shared_ptr<Page> page = nullptr; + while ((page = page_reader->NextPage()) != nullptr) { + if (page->type() == PageType::DATA_PAGE || + page->type() == PageType::DATA_PAGE_V2) { + ASSERT_EQ(std::static_pointer_cast<DataPage>(page)->statistics().is_set(), + expect_no_page_index); + } + } + } + } + } + + private: + void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages, + int64_t* offset_lower_bound_in_out) { + ASSERT_NE(offset_index, nullptr); + const auto& locations = offset_index->page_locations(); + ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size()); + int64_t prev_first_row_index = -1; + for (const auto& location : locations) { + // Make sure first_row_index is in the ascending order within a row group. + ASSERT_GT(location.first_row_index, prev_first_row_index); + // Make sure page offset is in the ascending order across the file. + ASSERT_GE(location.offset, *offset_lower_bound_in_out); + // Make sure page size is positive. + ASSERT_GT(location.compressed_page_size, 0); + prev_first_row_index = location.first_row_index; + *offset_lower_bound_in_out = location.offset + location.compressed_page_size; + } + } + + protected: + std::vector<ColumnIndexObject> column_indexes_; +}; + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)}, + /*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics() + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); + for (auto& column_index : column_indexes_) { + // Means page index is empty. + EXPECT_EQ(ColumnIndexObject{}, column_index); + } +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics("c0") + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + ColumnIndexObject empty_column_index{}; + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) /* write single-row row group */ + ->max_statistics_size(20) /* drop stats larger than it */ + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + ["short_string"], + ["very_large_string_to_drop_stats"] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"}, + /*max_values=*/{"short_string"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{})); +} + +TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->data_pagesize(1) /* write multiple pages */ + ->build(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + WriteFile( + writer_properties, + ::arrow::TableFromJSON( + schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{ + /*null_pages=*/{false, false, false, true}, + /*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""}, + /*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""}, + BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}}, + ColumnIndexObject{/*null_pages=*/{false, false, false, true}, + /*min_values=*/{"a", "c", "f", ""}, + /*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(3) /* 3 rows per row group */ + ->build(); + + // Create table to write with NaNs. + auto vectors = std::vector<std::shared_ptr<Array>>(4); + // NaN will be ignored in min/max stats. + ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]); + // Lower bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]); + // Upper bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]); + // Pages with all NaNs will not build column index. + ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + arrow::ChunkedArray::Make(vectors, ::arrow::float64())); + + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); + auto table = Table::Make(schema, {chunked_array}); + WriteFile(writer_properties, table); + + ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(0.1)}, + /*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{ + /* Page with only NaN values does not have column index built */})); +} + +TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::int64()), + ::arrow::field("c2", ::arrow::int64())}); + auto writer_properties = + WriterProperties::Builder() + .enable_write_page_index() /* enable by default */ + ->enable_write_page_index("c0") /* enable c0 explicitly */ + ->disable_write_page_index("c1") /* disable c1 explicitly */ + ->build(); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, + /*expect_columns_without_index=*/{1}); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)}, + /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/* page index of c1 is disabled */}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}})); +} + +class ParquetBloomFilterRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadBloomFilters(int expect_num_row_groups, + const std::set<int>& expect_columns_without_bloom_filter = {}) { + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + auto metadata = reader->metadata(); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_reader = bloom_filter_reader.RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + if (expect_columns_without_bloom_filter.find(col) != + expect_columns_without_bloom_filter.cend()) { + ASSERT_EQ(row_group_reader->GetColumnBloomFilter(col), nullptr); + } else { + bloom_filters_.push_back(row_group_reader->GetColumnBloomFilter(col)); + ASSERT_NE(bloom_filters_.back(), nullptr); + } + } + } + } + + template <typename ArrowType> + void VerifyBloomFilterContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterNotContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterAcrossRowGroups(const std::shared_ptr<ChunkedArray>& column, + int num_row_groups, + const std::vector<int64_t>& row_group_row_counts, + int num_bf_columns, int col_idx) { + int64_t current_row_offset = 0; + for (int rg = 0; rg < num_row_groups; ++rg) { + int bf_idx = rg * num_bf_columns + col_idx; + auto col_slice = column->Slice(current_row_offset, row_group_row_counts[rg]); + + // Verify this bloom filter contains values from this row group + VerifyBloomFilterContains<ArrowType>(*bloom_filters_[bf_idx], *col_slice); + + // Verify other row groups' bloom filters don't contain these values + for (int other_rg = 0; other_rg < num_row_groups; ++other_rg) { + if (other_rg != rg) { + int other_bf_idx = other_rg * num_bf_columns + col_idx; + VerifyBloomFilterNotContains<ArrowType>(*bloom_filters_[other_bf_idx], + *col_slice); + } + } + current_row_offset += row_group_row_counts[rg]; + } + } + + protected: + // Bloom filters for each column in each row group. + std::vector<std::unique_ptr<BloomFilter>> bloom_filters_; +}; + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->enable_bloom_filter("c1", options) + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON( + schema, {R"([[1,"a"],[2,"b"],[3,"c"],[null,"d"],[5,null],[6,"f"]])"}); + WriteFile(writer_properties, table); + + constexpr int kNumRowGroups = 2; + constexpr int kNumBFColumns = 2; + const std::vector<int64_t> kRowGroupRowCount{4, 2}; + + ReadBloomFilters(kNumRowGroups); + ASSERT_EQ(kNumRowGroups * kNumBFColumns, bloom_filters_.size()); + + VerifyBloomFilterAcrossRowGroups<::arrow::Int64Type>(table->column(0), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/0); + VerifyBloomFilterAcrossRowGroups<::arrow::StringType>(table->column(1), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/1); +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { + const std::vector<std::string> json_contents = { + R"([[1,"a"],[2,"b"],[3,"c"],[null,"d"],[5,null],[6,"f"]])"}; + + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->enable_bloom_filter("c1", options) + ->max_row_group_length(4) + ->build(); + + constexpr int kNumRowGroups = 2; + constexpr int kNumBFColumns = 2; + const std::vector<int64_t> kRowGroupRowCount{4, 2}; + + struct StringTypeTestCase { + std::shared_ptr<::arrow::DataType> arrow_type; + std::function<void(ParquetBloomFilterRoundTripTest*, + const std::shared_ptr<ChunkedArray>&, const std::vector<int64_t>&)> + verify_func; + }; + + std::vector<StringTypeTestCase> test_cases = { + {::arrow::utf8(), + [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::StringType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}, + {::arrow::large_utf8(), + [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::LargeStringType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}, + {::arrow::utf8_view(), [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::StringViewType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}}; + + for (const auto& test_case : test_cases) { + bloom_filters_.clear(); + + auto dict_schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::dictionary(::arrow::int64(), ::arrow::int64())), + ::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))}); + auto origin_schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", test_case.arrow_type)}); + + auto dict_encoded_table = ::arrow::TableFromJSON(dict_schema, json_contents); + auto table = ::arrow::TableFromJSON(origin_schema, json_contents); + WriteFile(writer_properties, dict_encoded_table); + + ReadBloomFilters(kNumRowGroups); + ASSERT_EQ(kNumRowGroups * kNumBFColumns, bloom_filters_.size()); + + VerifyBloomFilterAcrossRowGroups<::arrow::Int64Type>(table->column(0), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/0); + test_case.verify_func(this, table->column(1), kRowGroupRowCount); + } +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->disable_bloom_filter("c1") Review Comment: Can we disable the first column instead, to ensure that we don't mix column ordinals and bloom filter ordinals? ########## cpp/src/parquet/arrow/index_test.cc: ########## @@ -0,0 +1,680 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <cstdint> +#include <set> +#include <vector> + +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/properties.h" + +using arrow::Array; +using arrow::Buffer; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::Result; +using arrow::Status; +using arrow::Table; +using arrow::internal::checked_cast; +using arrow::io::BufferReader; + +using ParquetType = parquet::Type; + +using parquet::arrow::FromParquetSchema; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet::arrow { + +namespace { + +struct ColumnIndexObject { + std::vector<bool> null_pages; + std::vector<std::string> min_values; + std::vector<std::string> max_values; + BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + std::vector<int64_t> null_counts; + + ColumnIndexObject() = default; + + ColumnIndexObject(const std::vector<bool>& null_pages, + const std::vector<std::string>& min_values, + const std::vector<std::string>& max_values, + BoundaryOrder::type boundary_order, + const std::vector<int64_t>& null_counts) + : null_pages(null_pages), + min_values(min_values), + max_values(max_values), + boundary_order(boundary_order), + null_counts(null_counts) {} + + explicit ColumnIndexObject(const ColumnIndex* column_index) { + if (column_index == nullptr) { + return; + } + null_pages = column_index->null_pages(); + min_values = column_index->encoded_min_values(); + max_values = column_index->encoded_max_values(); + boundary_order = column_index->boundary_order(); + if (column_index->has_null_counts()) { + null_counts = column_index->null_counts(); + } + } + + bool operator==(const ColumnIndexObject& b) const { + return null_pages == b.null_pages && min_values == b.min_values && + max_values == b.max_values && boundary_order == b.boundary_order && + null_counts == b.null_counts; + } +}; + +auto encode_int64 = [](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); +}; + +auto encode_double = [](double value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(double)); +}; + +} // namespace + +class TestingWithPageIndex { + public: + void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties, + const std::shared_ptr<::arrow::Table>& table) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<FileWriter> arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + protected: + std::shared_ptr<Buffer> buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, + const std::set<int>& expect_columns_without_index = {}) { + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + int64_t offset_lower_bound = 0; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + auto row_group_reader = reader->RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + auto column_index = row_group_index_reader->GetColumnIndex(col); + column_indexes_.emplace_back(column_index.get()); + + bool expect_no_page_index = + expect_columns_without_index.find(col) != expect_columns_without_index.cend(); + + auto offset_index = row_group_index_reader->GetOffsetIndex(col); + if (expect_no_page_index) { + ASSERT_EQ(offset_index, nullptr); + } else { + CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound); + } + + // Verify page stats are not written to page header if page index is enabled. + auto page_reader = row_group_reader->GetColumnPageReader(col); + ASSERT_NE(page_reader, nullptr); + std::shared_ptr<Page> page = nullptr; + while ((page = page_reader->NextPage()) != nullptr) { + if (page->type() == PageType::DATA_PAGE || + page->type() == PageType::DATA_PAGE_V2) { + ASSERT_EQ(std::static_pointer_cast<DataPage>(page)->statistics().is_set(), + expect_no_page_index); + } + } + } + } + } + + private: + void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages, + int64_t* offset_lower_bound_in_out) { + ASSERT_NE(offset_index, nullptr); + const auto& locations = offset_index->page_locations(); + ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size()); + int64_t prev_first_row_index = -1; + for (const auto& location : locations) { + // Make sure first_row_index is in the ascending order within a row group. + ASSERT_GT(location.first_row_index, prev_first_row_index); + // Make sure page offset is in the ascending order across the file. + ASSERT_GE(location.offset, *offset_lower_bound_in_out); + // Make sure page size is positive. + ASSERT_GT(location.compressed_page_size, 0); + prev_first_row_index = location.first_row_index; + *offset_lower_bound_in_out = location.offset + location.compressed_page_size; + } + } + + protected: + std::vector<ColumnIndexObject> column_indexes_; +}; + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)}, + /*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics() + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); + for (auto& column_index : column_indexes_) { + // Means page index is empty. + EXPECT_EQ(ColumnIndexObject{}, column_index); + } +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics("c0") + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + ColumnIndexObject empty_column_index{}; + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) /* write single-row row group */ + ->max_statistics_size(20) /* drop stats larger than it */ + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + ["short_string"], + ["very_large_string_to_drop_stats"] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"}, + /*max_values=*/{"short_string"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{})); +} + +TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->data_pagesize(1) /* write multiple pages */ + ->build(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + WriteFile( + writer_properties, + ::arrow::TableFromJSON( + schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{ + /*null_pages=*/{false, false, false, true}, + /*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""}, + /*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""}, + BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}}, + ColumnIndexObject{/*null_pages=*/{false, false, false, true}, + /*min_values=*/{"a", "c", "f", ""}, + /*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(3) /* 3 rows per row group */ + ->build(); + + // Create table to write with NaNs. + auto vectors = std::vector<std::shared_ptr<Array>>(4); + // NaN will be ignored in min/max stats. + ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]); + // Lower bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]); + // Upper bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]); + // Pages with all NaNs will not build column index. + ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + arrow::ChunkedArray::Make(vectors, ::arrow::float64())); + + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); + auto table = Table::Make(schema, {chunked_array}); + WriteFile(writer_properties, table); + + ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(0.1)}, + /*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{ + /* Page with only NaN values does not have column index built */})); +} + +TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::int64()), + ::arrow::field("c2", ::arrow::int64())}); + auto writer_properties = + WriterProperties::Builder() + .enable_write_page_index() /* enable by default */ + ->enable_write_page_index("c0") /* enable c0 explicitly */ + ->disable_write_page_index("c1") /* disable c1 explicitly */ + ->build(); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, + /*expect_columns_without_index=*/{1}); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)}, + /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/* page index of c1 is disabled */}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}})); +} + +class ParquetBloomFilterRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadBloomFilters(int expect_num_row_groups, + const std::set<int>& expect_columns_without_bloom_filter = {}) { + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + auto metadata = reader->metadata(); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_reader = bloom_filter_reader.RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + if (expect_columns_without_bloom_filter.find(col) != + expect_columns_without_bloom_filter.cend()) { + ASSERT_EQ(row_group_reader->GetColumnBloomFilter(col), nullptr); + } else { + bloom_filters_.push_back(row_group_reader->GetColumnBloomFilter(col)); + ASSERT_NE(bloom_filters_.back(), nullptr); + } + } + } + } + + template <typename ArrowType> + void VerifyBloomFilterContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterNotContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); + } + } + + template <typename ArrowType> + void VerifyBloomFilterAcrossRowGroups(const std::shared_ptr<ChunkedArray>& column, + int num_row_groups, + const std::vector<int64_t>& row_group_row_counts, + int num_bf_columns, int col_idx) { + int64_t current_row_offset = 0; + for (int rg = 0; rg < num_row_groups; ++rg) { + int bf_idx = rg * num_bf_columns + col_idx; + auto col_slice = column->Slice(current_row_offset, row_group_row_counts[rg]); + + // Verify this bloom filter contains values from this row group + VerifyBloomFilterContains<ArrowType>(*bloom_filters_[bf_idx], *col_slice); + + // Verify other row groups' bloom filters don't contain these values + for (int other_rg = 0; other_rg < num_row_groups; ++other_rg) { + if (other_rg != rg) { + int other_bf_idx = other_rg * num_bf_columns + col_idx; + VerifyBloomFilterNotContains<ArrowType>(*bloom_filters_[other_bf_idx], + *col_slice); + } + } + current_row_offset += row_group_row_counts[rg]; + } + } + + protected: + // Bloom filters for each column in each row group. + std::vector<std::unique_ptr<BloomFilter>> bloom_filters_; +}; + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->enable_bloom_filter("c1", options) + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON( + schema, {R"([[1,"a"],[2,"b"],[3,"c"],[null,"d"],[5,null],[6,"f"]])"}); + WriteFile(writer_properties, table); + + constexpr int kNumRowGroups = 2; + constexpr int kNumBFColumns = 2; + const std::vector<int64_t> kRowGroupRowCount{4, 2}; + + ReadBloomFilters(kNumRowGroups); + ASSERT_EQ(kNumRowGroups * kNumBFColumns, bloom_filters_.size()); + + VerifyBloomFilterAcrossRowGroups<::arrow::Int64Type>(table->column(0), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/0); + VerifyBloomFilterAcrossRowGroups<::arrow::StringType>(table->column(1), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/1); +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) { + const std::vector<std::string> json_contents = { + R"([[1,"a"],[2,"b"],[3,"c"],[null,"d"],[5,null],[6,"f"]])"}; + + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->enable_bloom_filter("c1", options) + ->max_row_group_length(4) + ->build(); + + constexpr int kNumRowGroups = 2; + constexpr int kNumBFColumns = 2; + const std::vector<int64_t> kRowGroupRowCount{4, 2}; + + struct StringTypeTestCase { + std::shared_ptr<::arrow::DataType> arrow_type; + std::function<void(ParquetBloomFilterRoundTripTest*, + const std::shared_ptr<ChunkedArray>&, const std::vector<int64_t>&)> + verify_func; + }; + + std::vector<StringTypeTestCase> test_cases = { + {::arrow::utf8(), + [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::StringType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}, + {::arrow::large_utf8(), + [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::LargeStringType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}, + {::arrow::utf8_view(), [&](auto* test, auto& col, auto& counts) { + test->template VerifyBloomFilterAcrossRowGroups<::arrow::StringViewType>( + col, kNumRowGroups, kRowGroupRowCount, kNumBFColumns, /*col_idx=*/1); + }}}; + + for (const auto& test_case : test_cases) { + bloom_filters_.clear(); + + auto dict_schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::dictionary(::arrow::int64(), ::arrow::int64())), + ::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), ::arrow::utf8()))}); + auto origin_schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", test_case.arrow_type)}); + + auto dict_encoded_table = ::arrow::TableFromJSON(dict_schema, json_contents); + auto table = ::arrow::TableFromJSON(origin_schema, json_contents); + WriteFile(writer_properties, dict_encoded_table); + + ReadBloomFilters(kNumRowGroups); + ASSERT_EQ(kNumRowGroups * kNumBFColumns, bloom_filters_.size()); + + VerifyBloomFilterAcrossRowGroups<::arrow::Int64Type>(table->column(0), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/0); + test_case.verify_func(this, table->column(1), kRowGroupRowCount); + } +} + +TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) { + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + BloomFilterOptions options{10, 0.05}; + auto writer_properties = WriterProperties::Builder() + .enable_bloom_filter("c0", options) + ->disable_bloom_filter("c1") + ->max_row_group_length(4) + ->build(); + auto table = ::arrow::TableFromJSON( + schema, {R"([[1,"a"],[2,"b"],[3,"c"],[null,"d"],[5,null],[6,"f"]])"}); + WriteFile(writer_properties, table); + + constexpr int kNumRowGroups = 2; + constexpr int kNumBFColumns = 1; + const std::vector<int64_t> kRowGroupRowCount{4, 2}; + + ReadBloomFilters(kNumRowGroups, + /*expect_columns_without_bloom_filter=*/{1}); + ASSERT_EQ(kNumRowGroups * kNumBFColumns, bloom_filters_.size()); + + // Only verify c0 since c1 doesn't have bloom filter + VerifyBloomFilterAcrossRowGroups<::arrow::Int64Type>(table->column(0), kNumRowGroups, + kRowGroupRowCount, kNumBFColumns, + /*col_idx=*/0); +} + +TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) { Review Comment: What happens if there is a boolean column and I enable bloom filters for all columns, instead of that column specifically? Does it work? ########## cpp/src/parquet/arrow/index_test.cc: ########## @@ -0,0 +1,680 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <cstdint> +#include <set> +#include <vector> + +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/properties.h" + +using arrow::Array; +using arrow::Buffer; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::Result; +using arrow::Status; +using arrow::Table; +using arrow::internal::checked_cast; +using arrow::io::BufferReader; + +using ParquetType = parquet::Type; + +using parquet::arrow::FromParquetSchema; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet::arrow { + +namespace { + +struct ColumnIndexObject { + std::vector<bool> null_pages; + std::vector<std::string> min_values; + std::vector<std::string> max_values; + BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + std::vector<int64_t> null_counts; + + ColumnIndexObject() = default; + + ColumnIndexObject(const std::vector<bool>& null_pages, + const std::vector<std::string>& min_values, + const std::vector<std::string>& max_values, + BoundaryOrder::type boundary_order, + const std::vector<int64_t>& null_counts) + : null_pages(null_pages), + min_values(min_values), + max_values(max_values), + boundary_order(boundary_order), + null_counts(null_counts) {} + + explicit ColumnIndexObject(const ColumnIndex* column_index) { + if (column_index == nullptr) { + return; + } + null_pages = column_index->null_pages(); + min_values = column_index->encoded_min_values(); + max_values = column_index->encoded_max_values(); + boundary_order = column_index->boundary_order(); + if (column_index->has_null_counts()) { + null_counts = column_index->null_counts(); + } + } + + bool operator==(const ColumnIndexObject& b) const { + return null_pages == b.null_pages && min_values == b.min_values && + max_values == b.max_values && boundary_order == b.boundary_order && + null_counts == b.null_counts; + } +}; + +auto encode_int64 = [](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); +}; + +auto encode_double = [](double value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(double)); +}; + +} // namespace + +class TestingWithPageIndex { + public: + void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties, + const std::shared_ptr<::arrow::Table>& table) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<FileWriter> arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + protected: + std::shared_ptr<Buffer> buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, + const std::set<int>& expect_columns_without_index = {}) { + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + int64_t offset_lower_bound = 0; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + auto row_group_reader = reader->RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + auto column_index = row_group_index_reader->GetColumnIndex(col); + column_indexes_.emplace_back(column_index.get()); + + bool expect_no_page_index = + expect_columns_without_index.find(col) != expect_columns_without_index.cend(); + + auto offset_index = row_group_index_reader->GetOffsetIndex(col); + if (expect_no_page_index) { + ASSERT_EQ(offset_index, nullptr); + } else { + CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound); + } + + // Verify page stats are not written to page header if page index is enabled. + auto page_reader = row_group_reader->GetColumnPageReader(col); + ASSERT_NE(page_reader, nullptr); + std::shared_ptr<Page> page = nullptr; + while ((page = page_reader->NextPage()) != nullptr) { + if (page->type() == PageType::DATA_PAGE || + page->type() == PageType::DATA_PAGE_V2) { + ASSERT_EQ(std::static_pointer_cast<DataPage>(page)->statistics().is_set(), + expect_no_page_index); + } + } + } + } + } + + private: + void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages, + int64_t* offset_lower_bound_in_out) { + ASSERT_NE(offset_index, nullptr); + const auto& locations = offset_index->page_locations(); + ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size()); + int64_t prev_first_row_index = -1; + for (const auto& location : locations) { + // Make sure first_row_index is in the ascending order within a row group. + ASSERT_GT(location.first_row_index, prev_first_row_index); + // Make sure page offset is in the ascending order across the file. + ASSERT_GE(location.offset, *offset_lower_bound_in_out); + // Make sure page size is positive. + ASSERT_GT(location.compressed_page_size, 0); + prev_first_row_index = location.first_row_index; + *offset_lower_bound_in_out = location.offset + location.compressed_page_size; + } + } + + protected: + std::vector<ColumnIndexObject> column_indexes_; +}; + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)}, + /*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics() + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); + for (auto& column_index : column_indexes_) { + // Means page index is empty. + EXPECT_EQ(ColumnIndexObject{}, column_index); + } +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics("c0") + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + ColumnIndexObject empty_column_index{}; + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) /* write single-row row group */ + ->max_statistics_size(20) /* drop stats larger than it */ + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + ["short_string"], + ["very_large_string_to_drop_stats"] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"}, + /*max_values=*/{"short_string"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{})); +} + +TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->data_pagesize(1) /* write multiple pages */ + ->build(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + WriteFile( + writer_properties, + ::arrow::TableFromJSON( + schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{ + /*null_pages=*/{false, false, false, true}, + /*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""}, + /*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""}, + BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}}, + ColumnIndexObject{/*null_pages=*/{false, false, false, true}, + /*min_values=*/{"a", "c", "f", ""}, + /*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(3) /* 3 rows per row group */ + ->build(); + + // Create table to write with NaNs. + auto vectors = std::vector<std::shared_ptr<Array>>(4); + // NaN will be ignored in min/max stats. + ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]); + // Lower bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]); + // Upper bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]); + // Pages with all NaNs will not build column index. + ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + arrow::ChunkedArray::Make(vectors, ::arrow::float64())); + + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); + auto table = Table::Make(schema, {chunked_array}); + WriteFile(writer_properties, table); + + ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(0.1)}, + /*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{ + /* Page with only NaN values does not have column index built */})); +} + +TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::int64()), + ::arrow::field("c2", ::arrow::int64())}); + auto writer_properties = + WriterProperties::Builder() + .enable_write_page_index() /* enable by default */ + ->enable_write_page_index("c0") /* enable c0 explicitly */ + ->disable_write_page_index("c1") /* disable c1 explicitly */ + ->build(); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, + /*expect_columns_without_index=*/{1}); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)}, + /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/* page index of c1 is disabled */}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}})); +} + +class ParquetBloomFilterRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadBloomFilters(int expect_num_row_groups, + const std::set<int>& expect_columns_without_bloom_filter = {}) { + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + auto metadata = reader->metadata(); + auto& bloom_filter_reader = reader->GetBloomFilterReader(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_reader = bloom_filter_reader.RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + if (expect_columns_without_bloom_filter.find(col) != + expect_columns_without_bloom_filter.cend()) { + ASSERT_EQ(row_group_reader->GetColumnBloomFilter(col), nullptr); + } else { + bloom_filters_.push_back(row_group_reader->GetColumnBloomFilter(col)); + ASSERT_NE(bloom_filters_.back(), nullptr); + } + } + } + } + + template <typename ArrowType> + void VerifyBloomFilterContains(const BloomFilter& bloom_filter, + const ::arrow::ChunkedArray& chunked_array) { + for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) { + if (value == std::nullopt) { + continue; + } + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(value.value()))); Review Comment: Not mandatory, but if we had `FindHash(const Scalar&)`, it could remove the need for templating in these test functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
