wgtmac commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1551306728


##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)

Review Comment:
   What's the point of making a copy here?



##########
cpp/src/parquet/properties.h:
##########
@@ -145,6 +145,15 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::UNKNOWN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+struct PARQUET_EXPORT BloomFilterOptions {

Review Comment:
   Do we really need to add a new struct to be a precedent in this header file? 
I think we have two options:
   1. Add 3 separate methods to ColumnProperties
     - enable_bloom_filter()
     - set_bloom_filter_ndv()
     - set_bloom_filter_fpp()
   2. Define the struct BloomFilterOptions in a separate bloom_filter_type.h 
with all other new structs together.
   
   I prefer option 1. WDYT?



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -167,6 +167,30 @@ class PARQUET_EXPORT BloomFilter {
 
   virtual ~BloomFilter() = default;
 
+  // Variant of const reference argument to facilitate template
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const ByteArray& value) const { return Hash(&value); }
+  /// Compute hash for fixed byte array value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const FLBA& value, uint32_t type_len) const {
+    return Hash(&value, type_len);
+  }
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const Int96& value) const { return Hash(&value); }
+  /// Compute hash for std::string_view value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const std::string_view& value) const {

Review Comment:
   ```suggestion
     uint64_t Hash(std::string_view value) const {
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
+}
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_.bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    ARROW_CHECK(insert_result.second);

Review Comment:
   Do we really need to check it?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5723,5 +5731,213 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(nullptr, bloom_filter);
+        } else {
+          ASSERT_NE(nullptr, bloom_filter);
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void VerifyBloomFilter(const BloomFilter* bloom_filter,
+                         const ::arrow::ChunkedArray& chunked_array) {
+    for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
+      if (value == std::nullopt) {
+        continue;
+      }
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
+    }
+  }
+
+ protected:
+  std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
+};
+
+TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  BloomFilterOptions options;
+  options.ndv = 100;
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_bloom_filter_options(options, "c0")
+                               ->enable_bloom_filter_options(options, "c1")
+                               ->max_row_group_length(4)
+                               ->build();
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+        [1,     "a"],
+        [2,     "b"],
+        [3,     "c"],
+        [null,  "d"],
+        [5,     null],
+        [6,     "f"]
+  ])"});
+  WriteFile(writer_properties, table);
+
+  ReadBloomFilters(/*expect_num_row_groups=*/2);
+  ASSERT_EQ(4, bloom_filters_.size());
+  std::vector<int64_t> row_group_row_count{4, 2};
+  int64_t current_row = 0;
+  int64_t bloom_filter_idx = 0;  // current index in `bloom_filters_`
+  for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
+    {
+      ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
+      auto col = table->column(0)->Slice(current_row, 
row_group_row_count[row_group_id]);
+      
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), 
*col);
+      ++bloom_filter_idx;
+    }
+    {
+      ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
+      auto col = table->column(1)->Slice(current_row, 
row_group_row_count[row_group_id]);
+      
VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(),
+                                             *col);
+      ++bloom_filter_idx;
+    }
+    current_row += row_group_row_count[row_group_id];
+  }
+}
+
+TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripDictionary) {
+  auto origin_schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::dictionary(::arrow::int64(), 
::arrow::int64())),
+       ::arrow::field("c1", ::arrow::dictionary(::arrow::int64(), 
::arrow::utf8()))});
+  bloom_filters_.clear();
+  BloomFilterOptions options;
+  options.ndv = 100;
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_bloom_filter_options(options, "c0")
+                               ->enable_bloom_filter_options(options, "c1")
+                               ->max_row_group_length(4)
+                               ->build();
+  std::vector<std::string> contents = {R"([
+        [1,     "a"],
+        [2,     "b"],
+        [3,     "c"],
+        [null,  "d"],
+        [5,     null],
+        [6,     "f"]
+  ])"};
+  auto table = ::arrow::TableFromJSON(schema, contents);
+  auto non_dict_table = ::arrow::TableFromJSON(origin_schema, contents);
+  WriteFile(writer_properties, table);
+
+  ReadBloomFilters(/*expect_num_row_groups=*/2);
+  ASSERT_EQ(4, bloom_filters_.size());
+  std::vector<int64_t> row_group_row_count{4, 2};
+  int64_t current_row = 0;
+  int64_t bloom_filter_idx = 0;  // current index in `bloom_filters_`
+  for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
+    {
+      ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
+      auto col = non_dict_table->column(0)->Slice(current_row,
+                                                  
row_group_row_count[row_group_id]);
+      
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), 
*col);
+      ++bloom_filter_idx;
+    }
+    {
+      ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
+      auto col = non_dict_table->column(1)->Slice(current_row,
+                                                  
row_group_row_count[row_group_id]);
+      
VerifyBloomFilter<::arrow::StringType>(bloom_filters_[bloom_filter_idx].get(),
+                                             *col);
+      ++bloom_filter_idx;
+    }
+    current_row += row_group_row_count[row_group_id];
+  }
+}
+
+TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTripWithOneFilter) {
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  BloomFilterOptions options;
+  options.ndv = 100;
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_bloom_filter_options(options, "c0")
+                               ->disable_bloom_filter("c1")
+                               ->max_row_group_length(4)
+                               ->build();
+  auto table = ::arrow::TableFromJSON(schema, {R"([
+        [1,     "a"],
+        [2,     "b"],
+        [3,     "c"],
+        [null,  "d"],
+        [5,     null],
+        [6,     "f"]
+  ])"});
+  WriteFile(writer_properties, table);
+
+  ReadBloomFilters(/*expect_num_row_groups=*/2, 
/*expect_columns_without_filter=*/{1});
+  ASSERT_EQ(2, bloom_filters_.size());
+  std::vector<int64_t> row_group_row_count{4, 2};
+  int64_t current_row = 0;
+  int64_t bloom_filter_idx = 0;  // current index in `bloom_filters_`
+  for (int64_t row_group_id = 0; row_group_id < 2; ++row_group_id) {
+    {
+      ASSERT_NE(nullptr, bloom_filters_[bloom_filter_idx]);
+      auto col = table->column(0)->Slice(current_row, 
row_group_row_count[row_group_id]);
+      
VerifyBloomFilter<::arrow::Int64Type>(bloom_filters_[bloom_filter_idx].get(), 
*col);
+      ++bloom_filter_idx;
+    }
+    current_row += row_group_row_count[row_group_id];
+  }
+}
+
+TEST_F(ParquetBloomFilterRoundTripTest, ThrowForBoolean) {

Review Comment:
   It seems that you have already added a similar test in the 
bloom_filter_reader_writer_test.cc for boolean type.



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::vector<std::string> files = {"data_index_bloom_encoding_stats.parquet",
+                                    
"data_index_bloom_encoding_with_length.parquet"};
+  for (const auto& test_file : files) {
+    std::string dir_string(get_data_dir());
+    std::string path = dir_string + "/" + test_file;
+    auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false);
+    auto file_metadata = reader->metadata();
+    EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+    auto row_group_0 = bloom_filter_reader.RowGroup(0);
+    ASSERT_NE(nullptr, row_group_0);
+    EXPECT_THROW_THAT(
+        [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException,
+        ::testing::Property(&ParquetException::what,
+                            ::testing::HasSubstr("Invalid row group 
ordinal")));
+    auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+    ASSERT_NE(nullptr, bloom_filter);
+    EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, 
ParquetException,
+                      ::testing::Property(&ParquetException::what,
+                                          ::testing::HasSubstr(
+                                              "Invalid column index at column 
ordinal")));
+
+    // assert exists
+    {
+      std::string_view sv = "Hello";
+      ByteArray ba{sv};
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+
+    // no exists
+    {
+      std::string_view sv = "NOT_EXISTS";
+      ByteArray ba{sv};
+      EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+  }
+}
+
+TEST(BloomFilterReader, FileNotHaveBloomFilter) {
+  // Can still get a BloomFilterReader and a RowGroupBloomFilter
+  // reader, but cannot get a non-null BloomFilter.
+  std::string dir_string(get_data_dir());
+  std::string path = dir_string + "/alltypes_plain.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_EQ(nullptr, bloom_filter);
+}
+
+// <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter.
+TEST(BloomFilterBuilderTest, BasicRoundTrip) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::ByteArray("c2")});
+  schema.Init(root);
+  WriterProperties::Builder properties_builder;
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
+  auto writer_properties = properties_builder.build();
+  auto builder = BloomFilterBuilder::Make(&schema, *writer_properties);
+
+  auto append_values_to_bloom_filter = [&](const std::vector<uint64_t>& 
insert_hashes) {
+    builder->AppendRowGroup();
+    auto bloom_filter = builder->GetOrCreateBloomFilter(0);
+    ASSERT_NE(nullptr, bloom_filter);
+    ASSERT_EQ(bloom_filter->GetBitsetSize(),
+              
BlockSplitBloomFilter::OptimalNumOfBytes(bloom_filter_options.ndv,
+                                                       
bloom_filter_options.fpp));
+    for (uint64_t hash : insert_hashes) {
+      bloom_filter->InsertHash(hash);
+    }
+  };
+  // First row-group
+  append_values_to_bloom_filter({100, 200});
+  // Second row-group
+  append_values_to_bloom_filter({300, 400});
+  auto sink = CreateOutputStream();
+  BloomFilterLocation location;
+  builder->WriteTo(sink.get(), &location);
+  EXPECT_EQ(2, location.bloom_filter_location.size());
+  for (auto& [row_group_id, row_group_bloom_filter] : 
location.bloom_filter_location) {
+    EXPECT_EQ(2, row_group_bloom_filter.size());
+    EXPECT_TRUE(row_group_bloom_filter[0].has_value());
+    EXPECT_FALSE(row_group_bloom_filter[1].has_value());
+  }
+
+  struct RowGroupBloomFilterCase {
+    int32_t row_group_id;
+    std::vector<uint64_t> exists_hashes;
+    std::vector<uint64_t> unexists_hashes;
+  };
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  std::vector<RowGroupBloomFilterCase> cases = {
+      RowGroupBloomFilterCase{/*row_group_id=*/0, /*exists_hashes=*/{100, 200},
+                              /*unexists_hashes=*/{300, 400}},
+      RowGroupBloomFilterCase{/*row_group_id=*/1, /*exists_hashes=*/{300, 400},
+                              /*unexists_hashes=*/{100, 200}}};
+  for (const auto& c : cases) {
+    int64_t bloom_filter_offset =
+        location.bloom_filter_location[c.row_group_id][0]->offset;
+    int32_t bloom_filter_length =
+        location.bloom_filter_location[c.row_group_id][0]->length;
+
+    ReaderProperties reader_properties;
+    ::arrow::io::BufferReader reader(
+        ::arrow::SliceBuffer(buffer, bloom_filter_offset, 
bloom_filter_length));
+    auto filter = 
parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader);
+    for (uint64_t hash : c.exists_hashes) {
+      EXPECT_TRUE(filter.FindHash(hash));
+    }
+    for (uint64_t hash : c.unexists_hashes) {
+      EXPECT_FALSE(filter.FindHash(hash));
+    }
+  }
+}
+
+TEST(BloomFilterBuilderTest, InvalidOperations) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::Boolean("c2")});
+  schema.Init(root);
+  WriterProperties::Builder properties_builder;
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2");
+  auto properties = properties_builder.build();
+  auto builder = BloomFilterBuilder::Make(&schema, *properties);
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(0), ParquetException);
+
+  builder->AppendRowGroup();
+  // GetOrCreateBloomFilter() with wrong column ordinal expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(2), ParquetException);

Review Comment:
   Match the exception message for this line and line 175?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties& 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter`
+  /// in a row group.
+  ///
+  /// \throws ParquetException It will throw an exception if the BloomFilter 
already
+  /// called `WriteTo`.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the BloomFilter from column ordinal.
+  ///
+  /// \param column_ordinal Column ordinal in schema, which is only for leaf 
columns.
+  ///
+  /// \return BloomFilter for the column and its memory ownership belongs to 
the
+  /// BloomFilterBuilder. It will return nullptr if bloom filter is not 
enabled for the
+  /// column.
+  ///
+  /// \throws ParquetException It will throw an exception if the BloomFilter 
already
+  /// called `WriteTo`, column_ordinal is out of bound, or without calling
+  /// `AppendRowGroup` before `GetOrCreateBloomFilter`.

Review Comment:
   ```suggestion
     /// \throws ParquetException if any of following conditions applies:
     /// 1) column_ordinal is out of bound.
     /// 2) `WriteTo()` has been called already.
     /// 3) `AppendRowGroup()` is not called before `GetOrCreateBloomFilter()`.
   ```



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -451,6 +469,21 @@ class FileSerializer : public ParquetFileWriter::Contents {
     }
   }
 
+  void WriteBloomFilter() {
+    if (bloom_filter_builder_ != nullptr) {
+      if (properties_->file_encryption_properties()) {
+        ParquetException::NYI("Encryption is not supported with bloom filter");
+      }
+      // Serialize page index after all row groups have been written and report

Review Comment:
   Please also modify comments at line 463 above.



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -451,6 +469,21 @@ class FileSerializer : public ParquetFileWriter::Contents {
     }
   }
 
+  void WriteBloomFilter() {
+    if (bloom_filter_builder_ != nullptr) {
+      if (properties_->file_encryption_properties()) {
+        ParquetException::NYI("Encryption is not supported with bloom filter");
+      }
+      // Serialize page index after all row groups have been written and report

Review Comment:
   ```suggestion
         // Serialize bloom filters after all row groups have been written and 
report
   ```



##########
cpp/src/parquet/type_fwd.h:
##########
@@ -82,6 +82,12 @@ class WriterPropertiesBuilder;
 class ArrowWriterProperties;
 class ArrowWriterPropertiesBuilder;
 
+class BloomFilter;
+
+namespace schema {
+class ColumnPath;

Review Comment:
   IMO, this class looks unnecessary to appear here as normally users do not 
use it.



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties& 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter`
+  /// in a row group.

Review Comment:
   ```suggestion
     /// This method must be called before `GetOrCreateBloomFilter` for a new 
row group.
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
+}
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);

Review Comment:
   This is already checked in `CheckState(column_ordinal);` above at line 97.



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it

Review Comment:
   > Column encryption is not implemented yet
   
   What about putting this to the beginning of `class BloomFilterBuilder`?
   



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties& 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter`
+  /// in a row group.
+  ///
+  /// \throws ParquetException It will throw an exception if the BloomFilter 
already
+  /// called `WriteTo`.

Review Comment:
   ```suggestion
     /// \throws ParquetException if WriteTo() has been called to flush bloom 
filters.
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -154,6 +157,8 @@ inline const T* AddIfNotNull(const T* base, int64_t offset) 
{
   return nullptr;
 }
 
+constexpr int64_t kHashBatchSize = 256;

Review Comment:
   Should we move it to bloom_filter.h?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
+}
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_.bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    ARROW_CHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  ARROW_CHECK(iter->second != nullptr);
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");
+  }
+  finished_ = true;
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
file_bloom_filters_.size();
+       ++row_group_ordinal) {
+    RowGroupBloomFilters& row_group_bloom_filters =
+        *file_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    bool has_valid_bloom_filter = false;
+    int num_columns = schema_->num_columns();
+    std::vector<std::optional<IndexLocation>> locations(num_columns, 
std::nullopt);
+
+    // serialize bloom filter in ascending order of column id
+    for (auto& [column_id, filter] : row_group_bloom_filters) {
+      ARROW_CHECK(filter != nullptr);
+      PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
+      filter->WriteTo(sink);
+      PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
+      has_valid_bloom_filter = true;
+      locations[column_id] = IndexLocation{offset, static_cast<int32_t>(pos - 
offset)};
+    }
+    if (has_valid_bloom_filter) {

Review Comment:
   This condition should always be true, isn't it?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1705,5 +1706,93 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  wp_builder.max_statistics_size(column_properties.max_statistics_size());
+  auto path = this->schema_.Column(0)->path();
+  if (column_properties.bloom_filter_enabled()) {
+    wp_builder.enable_bloom_filter_options(
+        column_properties.bloom_filter_options().value(), path);
+  } else {
+    wp_builder.disable_bloom_filter(path);
+  }
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ = BloomFilterBuilder::Make(&this->schema_, 
*this->writer_properties_);
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0);
+  std::shared_ptr<ColumnWriter> writer =
+      ColumnWriter::Make(this->metadata_.get(), std::move(pager),
+                         this->writer_properties_.get(), bloom_filter_);
+  return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+}
+
+// Note: BooleanType is Excluded.
+using TestBloomFilterTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, 
FloatType,
+                                              DoubleType, ByteArrayType, 
FLBAType>;
+
+TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes);
+
+TYPED_TEST(TestBloomFilterWriter, Basic) {
+  this->GenerateData(SMALL_SIZE);
+  ColumnProperties column_properties;
+  BloomFilterOptions options;
+  options.ndv = SMALL_SIZE;
+  column_properties.set_bloom_filter_options(options);
+
+  auto writer = this->BuildWriterWithBloomFilter(SMALL_SIZE, 
column_properties);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
+  writer->Close();
+
+  // Read all rows so we are sure that also the non-dictionary pages are read 
correctly
+  this->SetupValuesOut(SMALL_SIZE);
+  this->ReadColumnFully();
+  ASSERT_EQ(SMALL_SIZE, this->values_read_);
+  ASSERT_EQ(this->values_, this->values_out_);

Review Comment:
   ```suggestion
   ```
   
   What about removing them as we only care about bloom filter in this case?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;

Review Comment:
   It seems we don't have to copy the properties. The `FileSerializer` keeps a 
shared pointer of it to keep it alive and others just keep a const reference.



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1705,5 +1706,93 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;

Review Comment:
   Why do we need to set dictionary encoding and max statistics size while 
bloom filter has nothing to do with them? Let's simplify them by deleting 
unrelated code.



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;

Review Comment:
   Perhaps using a shared_ptr provides better readability?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }

Review Comment:
   I think this is fine.



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::vector<std::string> files = {"data_index_bloom_encoding_stats.parquet",
+                                    
"data_index_bloom_encoding_with_length.parquet"};
+  for (const auto& test_file : files) {
+    std::string dir_string(get_data_dir());
+    std::string path = dir_string + "/" + test_file;
+    auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false);
+    auto file_metadata = reader->metadata();
+    EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+    auto row_group_0 = bloom_filter_reader.RowGroup(0);
+    ASSERT_NE(nullptr, row_group_0);
+    EXPECT_THROW_THAT(
+        [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException,
+        ::testing::Property(&ParquetException::what,
+                            ::testing::HasSubstr("Invalid row group 
ordinal")));
+    auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+    ASSERT_NE(nullptr, bloom_filter);
+    EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, 
ParquetException,
+                      ::testing::Property(&ParquetException::what,
+                                          ::testing::HasSubstr(
+                                              "Invalid column index at column 
ordinal")));
+
+    // assert exists
+    {
+      std::string_view sv = "Hello";
+      ByteArray ba{sv};
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+
+    // no exists
+    {
+      std::string_view sv = "NOT_EXISTS";
+      ByteArray ba{sv};
+      EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+  }
+}
+
+TEST(BloomFilterReader, FileNotHaveBloomFilter) {
+  // Can still get a BloomFilterReader and a RowGroupBloomFilter
+  // reader, but cannot get a non-null BloomFilter.
+  std::string dir_string(get_data_dir());
+  std::string path = dir_string + "/alltypes_plain.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_EQ(nullptr, bloom_filter);
+}
+
+// <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter.
+TEST(BloomFilterBuilderTest, BasicRoundTrip) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::ByteArray("c2")});
+  schema.Init(root);
+  WriterProperties::Builder properties_builder;
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
+  auto writer_properties = properties_builder.build();
+  auto builder = BloomFilterBuilder::Make(&schema, *writer_properties);
+
+  auto append_values_to_bloom_filter = [&](const std::vector<uint64_t>& 
insert_hashes) {
+    builder->AppendRowGroup();
+    auto bloom_filter = builder->GetOrCreateBloomFilter(0);
+    ASSERT_NE(nullptr, bloom_filter);
+    ASSERT_EQ(bloom_filter->GetBitsetSize(),
+              
BlockSplitBloomFilter::OptimalNumOfBytes(bloom_filter_options.ndv,
+                                                       
bloom_filter_options.fpp));
+    for (uint64_t hash : insert_hashes) {
+      bloom_filter->InsertHash(hash);
+    }
+  };
+  // First row-group
+  append_values_to_bloom_filter({100, 200});
+  // Second row-group
+  append_values_to_bloom_filter({300, 400});
+  auto sink = CreateOutputStream();
+  BloomFilterLocation location;
+  builder->WriteTo(sink.get(), &location);
+  EXPECT_EQ(2, location.bloom_filter_location.size());
+  for (auto& [row_group_id, row_group_bloom_filter] : 
location.bloom_filter_location) {
+    EXPECT_EQ(2, row_group_bloom_filter.size());
+    EXPECT_TRUE(row_group_bloom_filter[0].has_value());
+    EXPECT_FALSE(row_group_bloom_filter[1].has_value());
+  }
+
+  struct RowGroupBloomFilterCase {
+    int32_t row_group_id;
+    std::vector<uint64_t> exists_hashes;
+    std::vector<uint64_t> unexists_hashes;
+  };
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  std::vector<RowGroupBloomFilterCase> cases = {
+      RowGroupBloomFilterCase{/*row_group_id=*/0, /*exists_hashes=*/{100, 200},
+                              /*unexists_hashes=*/{300, 400}},
+      RowGroupBloomFilterCase{/*row_group_id=*/1, /*exists_hashes=*/{300, 400},
+                              /*unexists_hashes=*/{100, 200}}};
+  for (const auto& c : cases) {
+    int64_t bloom_filter_offset =
+        location.bloom_filter_location[c.row_group_id][0]->offset;
+    int32_t bloom_filter_length =
+        location.bloom_filter_location[c.row_group_id][0]->length;
+
+    ReaderProperties reader_properties;
+    ::arrow::io::BufferReader reader(
+        ::arrow::SliceBuffer(buffer, bloom_filter_offset, 
bloom_filter_length));
+    auto filter = 
parquet::BlockSplitBloomFilter::Deserialize(reader_properties, &reader);
+    for (uint64_t hash : c.exists_hashes) {
+      EXPECT_TRUE(filter.FindHash(hash));
+    }
+    for (uint64_t hash : c.unexists_hashes) {
+      EXPECT_FALSE(filter.FindHash(hash));
+    }
+  }
+}
+
+TEST(BloomFilterBuilderTest, InvalidOperations) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::Boolean("c2")});
+  schema.Init(root);
+  WriterProperties::Builder properties_builder;
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2");
+  auto properties = properties_builder.build();
+  auto builder = BloomFilterBuilder::Make(&schema, *properties);
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(0), ParquetException);
+
+  builder->AppendRowGroup();
+  // GetOrCreateBloomFilter() with wrong column ordinal expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(2), ParquetException);
+  // GetOrCreateBloomFilter() with boolean expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(1), ParquetException);
+  auto filter = builder->GetOrCreateBloomFilter(0);
+  // Call GetOrCreateBloomFilter the second time it is actually a cached 
version.
+  EXPECT_EQ(filter, builder->GetOrCreateBloomFilter(0));
+  auto sink = CreateOutputStream();
+  BloomFilterLocation location;
+  builder->WriteTo(sink.get(), &location);
+  EXPECT_EQ(1, location.bloom_filter_location.size());
+  // Multiple WriteTo() expect throw.
+  ASSERT_THROW(builder->WriteTo(sink.get(), &location), ParquetException);
+}
+
+TEST(BloomFilterBuilderTest, GetOrCreate) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::Boolean("c2")});
+  schema.Init(root);
+  WriterProperties::Builder properties_builder;
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c1");
+  properties_builder.enable_bloom_filter_options(bloom_filter_options, "c2");
+  auto properties = properties_builder.build();
+  auto builder = BloomFilterBuilder::Make(&schema, *properties);
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetOrCreateBloomFilter(0), ParquetException);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to