wgtmac commented on code in PR #37400: URL: https://github.com/apache/arrow/pull/37400#discussion_r2255905490
########## cpp/src/parquet/properties.h: ########## @@ -214,6 +240,17 @@ class PARQUET_EXPORT ColumnProperties { page_index_enabled_ = page_index_enabled; } + void set_bloom_filter_options(std::optional<BloomFilterOptions> bloom_filter_options) { + if (bloom_filter_options) { + if (bloom_filter_options->fpp >= 1.0 || bloom_filter_options->fpp <= 0.0) { Review Comment: Should we hide the logic by adding `Status BloomFilterOptions::Validate()` and call it here? ########## cpp/src/parquet/properties.h: ########## @@ -167,6 +167,32 @@ static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOM static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = true; static constexpr SizeStatisticsLevel DEFAULT_SIZE_STATISTICS_LEVEL = SizeStatisticsLevel::PageAndColumnChunk; +static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024; +static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05; + +struct PARQUET_EXPORT BloomFilterOptions { + /// Expected number of distinct values to be inserted into the bloom filter. + /// + /// Usage of bloom filter is most beneficial for columns with large cardinality, + /// so a good heuristic is to set ndv to number of rows. However, it can reduce + /// disk size if you know in advance a smaller number of distinct values. + /// For very small ndv value it is probably not worth it to use bloom filter anyway. + /// + /// Increasing this value (without increasing fpp) will result in an increase in + /// disk or memory size. + int32_t ndv = DEFAULT_BLOOM_FILTER_NDV; + /// False-positive probability of the bloom filter. + /// + /// The bloom filter data structure is a trade-off between disk and memory space + /// versus fpp, the smaller the fpp, the more memory and disk space is required, + /// thus setting it to a reasonable value e.g. 0.1, 0.05, or 0.001 is recommended. + /// + /// Setting to very small number diminishes the value of the filter itself, + /// as the bitset size is even larger than just storing the whole value. + /// User is also expected to set ndv if it can be known in advance in order + /// to largely reduce space usage. + double fpp = DEFAULT_BLOOM_FILTER_FPP; +}; Review Comment: ```suggestion struct PARQUET_EXPORT BloomFilterOptions { /// Expected number of distinct values, default 1M. /// /// Bloom filters are most effective for high-cardinality columns. /// A larger value increases memory and disk usage. int32_t ndv = DEFAULT_BLOOM_FILTER_NDV; /// False-positive probability, default 0.05. /// /// Lower values require more memory and disk space. /// Acceptable values are in the range (0.0, 1.0). double fpp = DEFAULT_BLOOM_FILTER_FPP; }; ``` I still think that we don't need too much detail about bloom filter. ########## cpp/src/parquet/properties.h: ########## @@ -186,6 +200,24 @@ class PARQUET_EXPORT ColumnProperties { page_index_enabled_ = page_index_enabled; } + void set_bloom_filter_options(std::optional<BloomFilterOptions> bloom_filter_options) { Review Comment: The link seems pointing to a weird place. Could you explain it again? ########## cpp/src/parquet/arrow/arrow_parquet_index_test.cc: ########## @@ -0,0 +1,736 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <cstdint> +#include <functional> +#include <set> +#include <sstream> +#include <vector> + +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_dict.h" +#include "arrow/array/builder_nested.h" +#include "arrow/chunked_array.h" +#include "arrow/compute/api.h" +#include "arrow/extension/json.h" +#include "arrow/io/api.h" +#include "arrow/record_batch.h" +#include "arrow/scalar.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/extension_type.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type_fwd.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/range.h" + +#ifdef ARROW_CSV +# include "arrow/csv/api.h" +#endif + +#include "parquet/api/reader.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/test_util.h" +#include "parquet/arrow/writer.h" +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/properties.h" +#include "parquet/test_util.h" + +using arrow::Array; +using arrow::Buffer; +using arrow::ChunkedArray; +using arrow::default_memory_pool; +using arrow::Result; +using arrow::Status; +using arrow::Table; +using arrow::internal::checked_cast; +using arrow::io::BufferReader; + +using ParquetType = parquet::Type; + +using parquet::arrow::FromParquetSchema; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace parquet::arrow { + +namespace { + +struct ColumnIndexObject { + std::vector<bool> null_pages; + std::vector<std::string> min_values; + std::vector<std::string> max_values; + BoundaryOrder::type boundary_order = BoundaryOrder::Unordered; + std::vector<int64_t> null_counts; + + ColumnIndexObject() = default; + + ColumnIndexObject(const std::vector<bool>& null_pages, + const std::vector<std::string>& min_values, + const std::vector<std::string>& max_values, + BoundaryOrder::type boundary_order, + const std::vector<int64_t>& null_counts) + : null_pages(null_pages), + min_values(min_values), + max_values(max_values), + boundary_order(boundary_order), + null_counts(null_counts) {} + + explicit ColumnIndexObject(const ColumnIndex* column_index) { + if (column_index == nullptr) { + return; + } + null_pages = column_index->null_pages(); + min_values = column_index->encoded_min_values(); + max_values = column_index->encoded_max_values(); + boundary_order = column_index->boundary_order(); + if (column_index->has_null_counts()) { + null_counts = column_index->null_counts(); + } + } + + bool operator==(const ColumnIndexObject& b) const { + return null_pages == b.null_pages && min_values == b.min_values && + max_values == b.max_values && boundary_order == b.boundary_order && + null_counts == b.null_counts; + } +}; + +auto encode_int64 = [](int64_t value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t)); +}; + +auto encode_double = [](double value) { + return std::string(reinterpret_cast<const char*>(&value), sizeof(double)); +}; + +} // namespace + +class TestingWithPageIndex { + public: + void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties, + const std::shared_ptr<::arrow::Table>& table) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr<SchemaDescriptor> parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr<FileWriter> arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + protected: + std::shared_ptr<Buffer> buffer_; +}; + +class ParquetPageIndexRoundTripTest : public ::testing::Test, + public TestingWithPageIndex { + public: + void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, + const std::set<int>& expect_columns_without_index = {}) { + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_)); + + auto metadata = reader->metadata(); + ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups()); + + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + int64_t offset_lower_bound = 0; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + auto row_group_reader = reader->RowGroup(rg); + ASSERT_NE(row_group_reader, nullptr); + + for (int col = 0; col < metadata->num_columns(); ++col) { + auto column_index = row_group_index_reader->GetColumnIndex(col); + column_indexes_.emplace_back(column_index.get()); + + bool expect_no_page_index = + expect_columns_without_index.find(col) != expect_columns_without_index.cend(); + + auto offset_index = row_group_index_reader->GetOffsetIndex(col); + if (expect_no_page_index) { + ASSERT_EQ(offset_index, nullptr); + } else { + CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound); + } + + // Verify page stats are not written to page header if page index is enabled. + auto page_reader = row_group_reader->GetColumnPageReader(col); + ASSERT_NE(page_reader, nullptr); + std::shared_ptr<Page> page = nullptr; + while ((page = page_reader->NextPage()) != nullptr) { + if (page->type() == PageType::DATA_PAGE || + page->type() == PageType::DATA_PAGE_V2) { + ASSERT_EQ(std::static_pointer_cast<DataPage>(page)->statistics().is_set(), + expect_no_page_index); + } + } + } + } + } + + private: + void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages, + int64_t* offset_lower_bound_in_out) { + ASSERT_NE(offset_index, nullptr); + const auto& locations = offset_index->page_locations(); + ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size()); + int64_t prev_first_row_index = -1; + for (const auto& location : locations) { + // Make sure first_row_index is in the ascending order within a row group. + ASSERT_GT(location.first_row_index, prev_first_row_index); + // Make sure page offset is in the ascending order across the file. + ASSERT_GE(location.offset, *offset_lower_bound_in_out); + // Make sure page size is positive. + ASSERT_GT(location.compressed_page_size, 0); + prev_first_row_index = location.first_row_index; + *offset_lower_bound_in_out = location.offset + location.compressed_page_size; + } + } + + protected: + std::vector<ColumnIndexObject> column_indexes_; +}; + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)}, + /*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics() + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); + for (auto& column_index : column_indexes_) { + // Means page index is empty. + EXPECT_EQ(ColumnIndexObject{}, column_index); + } +} + +TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->disable_statistics("c0") + ->max_row_group_length(4) + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::utf8()), + ::arrow::field("c2", ::arrow::list(::arrow::int64()))}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + [1, "a", [1] ], + [2, "b", [1, 2] ], + [3, "c", [null] ], + [null, "d", [] ], + [5, null, [3, 3, 3]], + [6, "f", null ] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + ColumnIndexObject empty_column_index{}; + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"}, + /*max_values=*/{"d"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{2}}, + empty_column_index, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"}, + /*max_values=*/{"f"}, BoundaryOrder::Ascending, + /*null_counts=*/{1}}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)}, + /*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending, + /*null_counts=*/{1}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) /* write single-row row group */ + ->max_statistics_size(20) /* drop stats larger than it */ + ->build(); + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())}); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ + ["short_string"], + ["very_large_string_to_drop_stats"] + ])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"}, + /*max_values=*/{"short_string"}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{})); +} + +TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->data_pagesize(1) /* write multiple pages */ + ->build(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + WriteFile( + writer_properties, + ::arrow::TableFromJSON( + schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{ + /*null_pages=*/{false, false, false, true}, + /*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""}, + /*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""}, + BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}}, + ColumnIndexObject{/*null_pages=*/{false, false, false, true}, + /*min_values=*/{"a", "c", "f", ""}, + /*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending, + /*null_counts=*/{0, 0, 1, 2}})); +} + +TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(3) /* 3 rows per row group */ + ->build(); + + // Create table to write with NaNs. + auto vectors = std::vector<std::shared_ptr<Array>>(4); + // NaN will be ignored in min/max stats. + ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]); + // Lower bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]); + // Upper bound will use -0.0. + ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]); + // Pages with all NaNs will not build column index. + ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + arrow::ChunkedArray::Make(vectors, ::arrow::float64())); + + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); + auto table = Table::Make(schema, {chunked_array}); + WriteFile(writer_properties, table); + + ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(0.1)}, + /*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/*null_pages=*/{false}, + /*min_values=*/{encode_double(-0.0)}, + /*max_values=*/{encode_double(+0.0)}, + BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{ + /* Page with only NaN values does not have column index built */})); +} + +TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { + auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()), + ::arrow::field("c1", ::arrow::int64()), + ::arrow::field("c2", ::arrow::int64())}); + auto writer_properties = + WriterProperties::Builder() + .enable_write_page_index() /* enable by default */ + ->enable_write_page_index("c0") /* enable c0 explicitly */ + ->disable_write_page_index("c1") /* disable c1 explicitly */ + ->build(); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + + ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, + /*expect_columns_without_index=*/{1}); + + EXPECT_THAT( + column_indexes_, + ::testing::ElementsAre( + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)}, + /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}}, + ColumnIndexObject{/* page index of c1 is disabled */}, + ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)}, + /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending, + /*null_counts=*/{0}})); +} + +class ParquetBloomFilterRoundTripTest : public ::testing::Test, Review Comment: Split this and below into a separate file? ########## cpp/src/parquet/bloom_filter_writer_internal.cc: ########## @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/bloom_filter_writer_internal.h" + +#include "parquet/exception.h" +#include "parquet/schema.h" + +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/unreachable.h" +#include "arrow/visit_data_inline.h" + +namespace parquet::internal { + +constexpr int64_t kHashBatchSize = 256; + +template <typename ParquetType> +BloomFilterWriterImpl<ParquetType>::BloomFilterWriterImpl(const ColumnDescriptor* descr, + BloomFilter* bloom_filter) + : descr_(descr), bloom_filter_(bloom_filter) {} + +template <typename ParquetType> +bool BloomFilterWriterImpl<ParquetType>::HasBloomFilter() const { + return bloom_filter_ != nullptr; +} + +template <typename ParquetType> +void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilter(const T* values, + int64_t num_values) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size), + hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(current_hash_batch_size)); + } +} + +template <> +void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values, + int64_t num_values) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, descr_->type_length(), + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(current_hash_batch_size)); + } +} + +template <> +void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, int64_t) { + if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) { + throw ParquetException("BooleanType does not support bloom filters"); + } +} + +template <typename ParquetType> +void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilterSpaced( + const T* values, int64_t num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_ == nullptr) { + // No bloom filter to update + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast<int>(current_hash_batch_size)); + } + }); +} + +template <> +void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, int64_t, + const uint8_t*, + int64_t) {} + +template <> +void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* values, + int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, descr_->type_length(), + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast<int>(current_hash_batch_size)); + } + }); +} + +template <typename ArrayType> +void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& array) { + // Using a smaller size because an extra `byte_arrays` is used. + constexpr int64_t kBinaryHashBatchSize = 64; + std::array<ByteArray, kBinaryHashBatchSize> byte_arrays; Review Comment: @pitrou `VisitArraySpanInline` has already been used in the Parquet encoder for binary types. The indirection to get a list of string view cannot be eliminated any way if we want to leverage batch hashes. It is also worth doing this to simplify the code. ########## cpp/src/parquet/bloom_filter_builder_internal.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/io/type_fwd.h" +#include "parquet/types.h" + +namespace parquet { + +/// @brief Interface for collecting bloom filter of a parquet file. +class PARQUET_EXPORT BloomFilterBuilder { + public: + /// @brief API to create a BloomFilterBuilder. + /// + /// @param schema The schema of the file and it must outlive the created + /// BloomFilterBuilder. + /// @param properties The properties of the file with a set of `BloomFilterOption`s + /// for columns enabling bloom filters. It must outlive the created BloomFilterBuilder. + static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema, + const WriterProperties* properties); + + /// @brief Start a new row group to host all bloom filters belong to it. + /// + /// This method must be called before `GetOrCreateBloomFilter` for columns of a new row + /// group. + /// + /// @throws ParquetException if WriteTo() has been called to flush bloom filters. + virtual void AppendRowGroup() = 0; Review Comment: Because bloom filters cannot be flushed until all row group data are flushed. We need a stateful tracker to keep all bloom filters in memory and report their flushed positions to the FileMetaDataBuilder. ########## cpp/src/parquet/bloom_filter_writer_internal.h: ########## @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "parquet/bloom_filter.h" +#include "parquet/type_fwd.h" + +namespace arrow { +class Array; +} + +namespace parquet::internal { + +template <typename ParquetType> +class PARQUET_EXPORT BloomFilterWriterImpl { Review Comment: ```suggestion class PARQUET_EXPORT BloomFilterWriter { ``` We don't need a Impl suffix. ########## cpp/src/parquet/properties.h: ########## @@ -801,9 +884,10 @@ class PARQUET_EXPORT WriterProperties { std::unordered_map<std::string, bool> dictionary_enabled_; std::unordered_map<std::string, bool> statistics_enabled_; std::unordered_map<std::string, bool> page_index_enabled_; - bool content_defined_chunking_enabled_; CdcOptions content_defined_chunking_options_; + std::unordered_map<std::string, std::optional<BloomFilterOptions>> + bloom_filter_options_; Review Comment: ```suggestion std::unordered_map<std::string, BloomFilterOptions> bloom_filter_options_; ``` Should we simplify it like this? ########## cpp/src/parquet/file_writer.cc: ########## @@ -332,6 +339,13 @@ class FileSerializer : public ParquetFileWriter::Contents { } row_group_writer_.reset(); + // In Parquet standard, the Bloom filter data can be stored before the page indexes + // after all row groups or stored between row groups. We choose to store it before + // the page indexes after all row groups. + // Also, Putting all bloom filters together may provide a good chance to coalesce + // I/Os of different bloom filters. Especially when only one column has enabled it, + // which is the common case. Review Comment: ```suggestion // Per the Parquet spec, Bloom filter data can be stored before the page indexes // after all row groups or stored between row groups. We choose the former layout // which may provide a good chance to coalesce I/Os of adjacent bloom filters. ``` ########## cpp/src/parquet/CMakeLists.txt: ########## @@ -408,6 +410,8 @@ add_parquet_test(arrow-reader-writer-test arrow/arrow_statistics_test.cc arrow/variant_test.cc) +add_parquet_test(arrow-parquet-index-test SOURCES arrow/arrow_parquet_index_test.cc) Review Comment: No bloom filter equivalent test file? ########## cpp/src/parquet/bloom_filter_writer_internal.cc: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/bloom_filter_writer_internal.h" + +#include "parquet/exception.h" +#include "parquet/schema.h" + +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/unreachable.h" +#include "arrow/visit_data_inline.h" + +namespace parquet::internal { + +constexpr int64_t kHashBatchSize = 256; + +template <typename ParquetType> +BloomFilterWriterImpl<ParquetType>::BloomFilterWriterImpl(const ColumnDescriptor* descr, + BloomFilter* bloom_filter) + : descr_(descr), bloom_filter_(bloom_filter) {} + +template <typename ParquetType> +bool BloomFilterWriterImpl<ParquetType>::HasBloomFilter() const { + return bloom_filter_ != nullptr; +} + +template <typename ParquetType> +void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilter(const T* values, + int64_t num_values) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size), + hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(current_hash_batch_size)); + } +} + +template <> +void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values, + int64_t num_values) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + for (int64_t i = 0; i < num_values; i += kHashBatchSize) { + int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i); + bloom_filter_->Hashes(values, descr_->type_length(), + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(current_hash_batch_size)); + } +} + +template <> +void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, int64_t) { + if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) { + throw ParquetException("BooleanType does not support bloom filters"); + } +} + +template <typename ParquetType> +void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilterSpaced( + const T* values, int64_t num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_ == nullptr) { + // No bloom filter to update + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast<int>(current_hash_batch_size)); + } + }); +} + +template <> +void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, int64_t, + const uint8_t*, + int64_t) { + if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) { + throw ParquetException("BooleanType does not support bloom filters"); + } +} + +template <> +void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* values, + int64_t num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (bloom_filter_ == nullptr) { + return; + } + std::array<uint64_t, kHashBatchSize> hashes; + ::arrow::internal::VisitSetBitRunsVoid( + valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { + for (int64_t i = 0; i < length; i += kHashBatchSize) { + auto current_hash_batch_size = std::min(kHashBatchSize, length - i); + bloom_filter_->Hashes(values + i + position, descr_->type_length(), + static_cast<int>(current_hash_batch_size), hashes.data()); + bloom_filter_->InsertHashes(hashes.data(), + static_cast<int>(current_hash_batch_size)); + } + }); +} + +template <typename ArrayType> +void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& array) { + // Using a smaller size because an extra `byte_arrays` is used. + constexpr int64_t kBinaryHashBatchSize = 64; + std::array<ByteArray, kBinaryHashBatchSize> byte_arrays; + std::array<uint64_t, kBinaryHashBatchSize> hashes; + int hashes_idx = 0; + auto flush_hashes = [&]() { + ARROW_DCHECK_NE(0, hashes_idx); + bloom_filter.Hashes(byte_arrays.data(), static_cast<int>(hashes_idx), hashes.data()); + bloom_filter.InsertHashes(hashes.data(), static_cast<int>(hashes_idx)); + hashes_idx = 0; + }; + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>( + *array.data(), + [&](std::string_view view) { + if (hashes_idx == kHashBatchSize) { + flush_hashes(); + } + byte_arrays[hashes_idx] = view; + ++hashes_idx; + return ::arrow::Status::OK(); + }, + []() { return ::arrow::Status::OK(); })); + if (hashes_idx != 0) { + flush_hashes(); + } +} + +template <> +void BloomFilterWriterImpl<ByteArrayType>::UpdateBloomFilterArray( + const ::arrow::Array& values) { + if (bloom_filter_ == nullptr) { + return; + } + if (::arrow::is_binary_view_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryViewArray&>(values)); + } else if (::arrow::is_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::BinaryArray&>(values)); + } else if (::arrow::is_large_binary_like(values.type_id())) { + UpdateBinaryBloomFilter( + *bloom_filter_, + ::arrow::internal::checked_cast<const ::arrow::LargeBinaryArray&>(values)); + } else { + throw ParquetException("Bloom filter is not supported for this Arrow type: " + + values.type()->ToString()); + } +} + +template <typename ParquetType> +void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilterArray( + const ::arrow::Array& values) { + // Only ByteArray type would write ::arrow::Array directly. + ::arrow::Unreachable("UpdateBloomFilterArray for non ByteArray type is unreachable"); Review Comment: ```suggestion ::arrow::Unreachable("UpdateBloomFilterArray for non-ByteArray type is unreachable"); ``` ########## cpp/src/parquet/bloom_filter_writer_internal.h: ########## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "parquet/bloom_filter.h" + +namespace arrow { +class Array; +} + +namespace parquet { +class ColumnDescriptor; +} + +namespace parquet::internal { + +template <typename ParquetType> +class PARQUET_EXPORT BloomFilterWriterImpl { + public: + using T = typename ParquetType::c_type; + + BloomFilterWriterImpl(const ColumnDescriptor* descr, BloomFilter* bloom_filter); + + void UpdateBloomFilter(const T* values, int64_t num_values); + void UpdateBloomFilterSpaced(const T* values, int64_t num_values, + const uint8_t* valid_bits, int64_t valid_bits_offset); + void UpdateBloomFilterArray(const ::arrow::Array& values); + + bool HasBloomFilter() const; Review Comment: TBH, I think it should be renamed to `bool bloom_filter_enabled() const;` ########## cpp/src/parquet/bloom_filter_builder_internal.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/io/type_fwd.h" +#include "parquet/types.h" + +namespace parquet { + +/// @brief Interface for collecting bloom filter of a parquet file. +class PARQUET_EXPORT BloomFilterBuilder { + public: + /// @brief API to create a BloomFilterBuilder. + /// + /// @param schema The schema of the file and it must outlive the created + /// BloomFilterBuilder. + /// @param properties The properties of the file with a set of `BloomFilterOption`s + /// for columns enabling bloom filters. It must outlive the created BloomFilterBuilder. + static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema, + const WriterProperties* properties); + + /// @brief Start a new row group to host all bloom filters belong to it. + /// + /// This method must be called before `GetOrCreateBloomFilter` for columns of a new row + /// group. + /// + /// @throws ParquetException if WriteTo() has been called to flush bloom filters. + virtual void AppendRowGroup() = 0; Review Comment: If we do not want to have a stateful `AppendRowGroup()` function, we can change the function below to `BloomFilter* GetOrCreateBloomFilter(int32_t row_group_ordinal, int32_t column_ordinal)`. ########## cpp/src/parquet/properties.h: ########## @@ -909,6 +993,19 @@ class PARQUET_EXPORT WriterProperties { return false; } + bool bloom_filter_enabled() const { Review Comment: If we change the per-column config to `std::unordered_map<std::string, BloomFilterOptions> bloom_filter_options_`, then you can simply check `bloom_filter_options_.empty()` and remove this function, isn't it? ########## cpp/src/parquet/properties.h: ########## @@ -801,9 +884,10 @@ class PARQUET_EXPORT WriterProperties { std::unordered_map<std::string, bool> dictionary_enabled_; std::unordered_map<std::string, bool> statistics_enabled_; std::unordered_map<std::string, bool> page_index_enabled_; - bool content_defined_chunking_enabled_; CdcOptions content_defined_chunking_options_; + std::unordered_map<std::string, std::optional<BloomFilterOptions>> + bloom_filter_options_; Review Comment: BTW, it should be moved to line 887. ########## cpp/src/parquet/metadata.h: ########## @@ -505,21 +504,40 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_; }; +/// Alias type of page index location of a row group. The index location +/// is located by column ordinal. If a column does not have a page index, +/// its value is set to std::nullopt. +using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>; + +/// Alias type of bloom filter location of a row group. The filter location +/// is located by column ordinal. +/// +/// Number of columns with a bloom filter to be relatively small compared to +/// the number of overall columns, so map is used. +using RowGroupBloomFilterLocation = std::map<int32_t, IndexLocation>; + +/// Alias type of page index and location of a parquet file. The +/// index location is located by the row group ordinal. +using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>; + +/// Alias type of bloom filter and location of a parquet file. The +/// index location is located by the row group ordinal. +using FileBloomFilterLocation = std::map<size_t, RowGroupBloomFilterLocation>; Review Comment: What about adding a new function like below ```cpp void FileMetaDataBuilder::SetIndexLocation(IndexType type, int32_t row_group_id, int column_id, int64_t offset, int32_t length); ``` Then add ```cpp void BloomFilterBuilder::WriteTo(OutputStream* sink, FileMetadataBuilder* builder); ``` So the file writer can directly do this: ```cpp bloom_filter_builder_->WriteTo(sink_.get(), metadata_.get()); ``` In this way, we don't need to define any index location structure. We can also remove the code for page index to use this approach. ########## cpp/src/parquet/CMakeLists.txt: ########## @@ -159,6 +159,8 @@ set(PARQUET_SRCS arrow/variant_internal.cc arrow/writer.cc bloom_filter.cc + bloom_filter_builder_internal.cc + bloom_filter_writer_internal.cc Review Comment: I still think we should merge them into a single file. They are all internal apis for writing bloom filters. We can call the merged file `bloom_filter_internal.h/.cc` so it is everything about internal bloom filter functionalities. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org