wgtmac commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1895256903


##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -160,6 +160,7 @@ set(PARQUET_SRCS
     arrow/writer.cc
     bloom_filter.cc
     bloom_filter_reader.cc
+    bloom_filter_builder.cc

Review Comment:
   ```suggestion
       bloom_filter_builder.cc
       bloom_filter_reader.cc
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5869,5 +5877,244 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(nullptr, bloom_filter);
+        } else {
+          ASSERT_NE(nullptr, bloom_filter);
+          bloom_filters_.push_back(std::move(bloom_filter));

Review Comment:
   What about changing `bloom_filters_` to be an output parameter to function 
`ReadBloomFilters` instead of a class member variable?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5869,5 +5877,244 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(nullptr, bloom_filter);
+        } else {
+          ASSERT_NE(nullptr, bloom_filter);
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void VerifyBloomFilterContains(const BloomFilter* bloom_filter,
+                                 const ::arrow::ChunkedArray& chunked_array) {
+    for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
+      if (value == std::nullopt) {
+        continue;
+      }
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
+    }
+  }
+
+  template <typename ArrowType>
+  void VerifyBloomFilterNotContains(const BloomFilter* bloom_filter,
+                                    const ::arrow::ChunkedArray& 
chunked_array) {
+    for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
+      if (value == std::nullopt) {
+        continue;
+      }
+      EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(value.value())));
+    }
+  }
+
+ protected:
+  std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
+};
+
+TEST_F(ParquetBloomFilterRoundTripTest, SimpleRoundTrip) {

Review Comment:
   The three test cases below share a lot of common logic (with exactly same 
data). Should we refactor them to eliminate the duplicate?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  if (iter->second == nullptr) {
+    throw ParquetException("Bloom filter state is invalid for column ",
+                           column_descr->path());
+  }
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");
+  }
+  finished_ = true;
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
file_bloom_filters_.size();
+       ++row_group_ordinal) {
+    RowGroupBloomFilters& row_group_bloom_filters =
+        *file_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    int num_columns = schema_->num_columns();
+    RowGroupBloomFilterLocation locations;
+
+    // serialize bloom filter in ascending order of column id
+    for (auto& [column_id, filter] : row_group_bloom_filters) {
+      if (ARROW_PREDICT_FALSE(filter == nullptr)) {
+        throw ParquetException("Bloom filter state is invalid for column ", 
column_id);

Review Comment:
   ```suggestion
           throw ParquetException("Bloom filter is null for column ", 
column_id);
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -165,6 +165,15 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::UNKNOWN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+struct PARQUET_EXPORT BloomFilterOptions {
+  /// Expected number of distinct values to be inserted into the bloom filter.

Review Comment:
   What about explaining the risk of over-estimate and under-estimate?



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1928,36 +1928,62 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
   }
 
   void SetPageIndexLocation(const PageIndexLocation& location) {
-    auto set_index_location =
+    auto set_index_location = [this](size_t row_group_ordinal,
+                                     const FileIndexLocation& 
file_index_location,
+                                     bool column_index) {
+      auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+      auto iter = file_index_location.find(row_group_ordinal);
+      if (iter != file_index_location.cend()) {
+        const auto& row_group_index_location = iter->second;
+        for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          if (i >= row_group_metadata.columns.size()) {
+            throw ParquetException("Cannot find metadata for column ordinal ", 
i);
+          }
+          auto& column_metadata = row_group_metadata.columns.at(i);
+          const auto& index_location = row_group_index_location.at(i);
+          if (index_location.has_value()) {
+            if (column_index) {
+              
column_metadata.__set_column_index_offset(index_location->offset);
+              
column_metadata.__set_column_index_length(index_location->length);
+            } else {
+              
column_metadata.__set_offset_index_offset(index_location->offset);
+              
column_metadata.__set_offset_index_length(index_location->length);
+            }
+          }
+        }
+      }
+    };
+
+    for (size_t i = 0; i < row_groups_.size(); ++i) {
+      set_index_location(i, location.column_index_location, true);
+      set_index_location(i, location.offset_index_location, false);
+    }
+  }
+
+  // Update location to all bloom filters in the parquet file.
+  void SetBloomFilterLocation(const BloomFilterLocation& location) {
+    auto set_bloom_filter_location =
         [this](size_t row_group_ordinal,
-               const PageIndexLocation::FileIndexLocation& file_index_location,
-               bool column_index) {
+               const FileBloomFilterLocation& file_bloom_filter_location) {
           auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
-          auto iter = file_index_location.find(row_group_ordinal);
-          if (iter != file_index_location.cend()) {
-            const auto& row_group_index_location = iter->second;
-            for (size_t i = 0; i < row_group_index_location.size(); ++i) {
-              if (i >= row_group_metadata.columns.size()) {
-                throw ParquetException("Cannot find metadata for column 
ordinal ", i);
-              }
-              auto& column_metadata = row_group_metadata.columns.at(i);
-              const auto& index_location = row_group_index_location.at(i);
-              if (index_location.has_value()) {
-                if (column_index) {
-                  
column_metadata.__set_column_index_offset(index_location->offset);
-                  
column_metadata.__set_column_index_length(index_location->length);
-                } else {
-                  
column_metadata.__set_offset_index_offset(index_location->offset);
-                  
column_metadata.__set_offset_index_length(index_location->length);
-                }
-              }
+          auto iter = file_bloom_filter_location.find(row_group_ordinal);
+          if (iter != file_bloom_filter_location.cend()) {
+            const auto& row_group_bloom_filter_location = iter->second;
+            for (auto& [column_id, bloom_filter_location] :
+                 row_group_bloom_filter_location) {
+              DCHECK_LT(static_cast<size_t>(column_id),
+                        row_group_metadata.columns.size());
+              auto& column = row_group_metadata.columns[column_id];
+              auto& column_metadata = column.meta_data;
+              
column_metadata.__set_bloom_filter_offset(bloom_filter_location.offset);
+              // bloom_filter_length is added by Parquet format 2.10.0

Review Comment:
   I don't think we need this comment



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to finished BloomFilterBuilder.");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  ARROW_CHECK(iter->second != nullptr);
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");
+  }
+  finished_ = true;
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
file_bloom_filters_.size();
+       ++row_group_ordinal) {
+    RowGroupBloomFilters& row_group_bloom_filters =
+        *file_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    bool has_valid_bloom_filter = false;
+    int num_columns = schema_->num_columns();
+    std::vector<std::optional<IndexLocation>> locations(num_columns, 
std::nullopt);
+
+    // serialize bloom filter in ascending order of column id
+    for (auto& [column_id, filter] : row_group_bloom_filters) {
+      ARROW_CHECK(filter != nullptr);
+      PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
+      filter->WriteTo(sink);

Review Comment:
   Add a comment for this?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5541,7 +5543,7 @@ auto encode_double = [](double value) {
 
 }  // namespace
 
-class ParquetPageIndexRoundTripTest : public ::testing::Test {
+class ParquetIndexRoundTripTest {

Review Comment:
   ```suggestion
   class TestingWithPageIndex {
   ```
   
   ParquetIndexRoundTripTest is a little bit confusing since it is not a 
complete test.



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -106,6 +106,32 @@ class PARQUET_EXPORT BloomFilter {
   /// @return hash result.
   virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
 
+  // Variant of const reference argument to facilitate template
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const ByteArray& value) const { return Hash(&value); }
+
+  /// Compute hash for fixed byte array value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  /// @param type_len the value length.
+  uint64_t Hash(const FLBA& value, uint32_t type_len) const {
+    return Hash(&value, type_len);
+  }
+  /// Compute hash for Int96 value by using its plain encoding result.

Review Comment:
   ```suggestion
     }
   
     /// Compute hash for Int96 value by using its plain encoding result.
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");

Review Comment:
   ```suggestion
         throw ParquetException("BloomFilterBuilder is already finished.");
   ```



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -106,6 +106,32 @@ class PARQUET_EXPORT BloomFilter {
   /// @return hash result.
   virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
 
+  // Variant of const reference argument to facilitate template
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const ByteArray& value) const { return Hash(&value); }
+
+  /// Compute hash for fixed byte array value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  /// @param type_len the value length.
+  uint64_t Hash(const FLBA& value, uint32_t type_len) const {
+    return Hash(&value, type_len);
+  }
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const Int96& value) const { return Hash(&value); }
+  /// Compute hash for std::string_view value by using its plain encoding 
result.

Review Comment:
   ```suggestion
     uint64_t Hash(const Int96& value) const { return Hash(&value); }
   
     /// Compute hash for std::string_view value by using its plain encoding 
result.
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  if (iter->second == nullptr) {
+    throw ParquetException("Bloom filter state is invalid for column ",

Review Comment:
   This is vague. Should we just say that "bloom filter should not be null for 
column xxx"?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  if (iter->second == nullptr) {
+    throw ParquetException("Bloom filter state is invalid for column ",
+                           column_descr->path());
+  }
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");

Review Comment:
   ```suggestion
       throw ParquetException("Cannot write a finished BloomFilterBuilder");
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");

Review Comment:
   ```suggestion
       throw ParquetException("Cannot append to a finished BloomFilterBuilder");
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);

Review Comment:
   We'd better explicitly throw ParquetException like line 119



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  if (iter->second == nullptr) {
+    throw ParquetException("Bloom filter state is invalid for column ",
+                           column_descr->path());
+  }
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");
+  }
+  finished_ = true;
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
file_bloom_filters_.size();
+       ++row_group_ordinal) {
+    RowGroupBloomFilters& row_group_bloom_filters =
+        *file_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    int num_columns = schema_->num_columns();
+    RowGroupBloomFilterLocation locations;
+
+    // serialize bloom filter in ascending order of column id
+    for (auto& [column_id, filter] : row_group_bloom_filters) {
+      if (ARROW_PREDICT_FALSE(filter == nullptr)) {
+        throw ParquetException("Bloom filter state is invalid for column ", 
column_id);
+      }
+      if (ARROW_PREDICT_FALSE(column_id < 0 || column_id >= num_columns)) {
+        throw ParquetException("Invalid column ordinal when serailizing: ", 
column_id);

Review Comment:
   ```suggestion
           throw ParquetException("Invalid column ordinal when serializing 
bloom filter: ", column_id);
   ```



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_util.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::vector<std::string> files = {"data_index_bloom_encoding_stats.parquet",
+                                    
"data_index_bloom_encoding_with_length.parquet"};
+  for (const auto& test_file : files) {
+    std::string dir_string(get_data_dir());
+    std::string path = dir_string + "/" + test_file;
+    auto reader = ParquetFileReader::OpenFile(path, /*memory_map=*/false);
+    auto file_metadata = reader->metadata();
+    EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+    auto row_group_0 = bloom_filter_reader.RowGroup(0);
+    ASSERT_NE(nullptr, row_group_0);
+    EXPECT_THROW_THAT(
+        [&]() { bloom_filter_reader.RowGroup(1); }, ParquetException,
+        ::testing::Property(&ParquetException::what,
+                            ::testing::HasSubstr("Invalid row group 
ordinal")));
+    auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+    ASSERT_NE(nullptr, bloom_filter);
+    EXPECT_THROW_THAT([&]() { row_group_0->GetColumnBloomFilter(1); }, 
ParquetException,
+                      ::testing::Property(&ParquetException::what,
+                                          ::testing::HasSubstr(
+                                              "Invalid column index at column 
ordinal")));
+
+    // assert exists
+    {
+      std::string_view sv = "Hello";
+      ByteArray ba{sv};
+      EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+
+    // no exists
+    {
+      std::string_view sv = "NOT_EXISTS";
+      ByteArray ba{sv};
+      EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+    }
+  }
+}
+
+TEST(BloomFilterReader, FileNotHaveBloomFilter) {
+  // Can still get a BloomFilterReader and a RowGroupBloomFilter
+  // reader, but cannot get a non-null BloomFilter.
+  std::string dir_string(get_data_dir());
+  std::string path = dir_string + "/alltypes_plain.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_EQ(nullptr, bloom_filter);
+}
+
+// <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter.
+TEST(BloomFilterBuilderTest, BasicRoundTrip) {

Review Comment:
   ```suggestion
   TEST(BloomFilterBuilder, BasicRoundTrip) {
   ```
   
   Keep the consistency at least for this file?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("Cannot call WriteTo() twice on 
BloomFilterBuilder.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException(
+        "Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is 
called");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  BloomFilterOptions bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.
+  DCHECK(!file_bloom_filters_.empty());
+  RowGroupBloomFilters& row_group_bloom_filter = *file_bloom_filters_.back();
+  auto iter = row_group_bloom_filter.find(column_ordinal);
+  if (iter == row_group_bloom_filter.end()) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    auto insert_result = row_group_bloom_filter.emplace(
+        column_ordinal, std::move(block_split_bloom_filter));
+    DCHECK(insert_result.second);
+    iter = insert_result.first;
+  }
+  if (iter->second == nullptr) {
+    throw ParquetException("Bloom filter state is invalid for column ",
+                           column_descr->path());
+  }
+  return iter->second.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (finished_) {
+    throw ParquetException("Cannot call WriteTo() multiple times.");
+  }
+  finished_ = true;
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
file_bloom_filters_.size();
+       ++row_group_ordinal) {
+    RowGroupBloomFilters& row_group_bloom_filters =
+        *file_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    int num_columns = schema_->num_columns();
+    RowGroupBloomFilterLocation locations;
+
+    // serialize bloom filter in ascending order of column id
+    for (auto& [column_id, filter] : row_group_bloom_filters) {
+      if (ARROW_PREDICT_FALSE(filter == nullptr)) {
+        throw ParquetException("Bloom filter state is invalid for column ", 
column_id);
+      }
+      if (ARROW_PREDICT_FALSE(column_id < 0 || column_id >= num_columns)) {
+        throw ParquetException("Invalid column ordinal when serailizing: ", 
column_id);
+      }
+      PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
+      filter->WriteTo(sink);
+      PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
+      if (pos - offset > std::numeric_limits<int32_t>::max()) {
+        throw ParquetException("Bloom filter is too large to be serialized, 
size: ",

Review Comment:
   Perhaps add column id to the exception message?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1791,11 +1810,15 @@ Status 
TypedColumnWriterImpl<DType>::WriteArrowDictionary(
                                  &exec_ctx));
       referenced_dictionary = referenced_dictionary_datum.make_array();
     }
-
-    int64_t non_null_count = chunk_indices->length() - 
chunk_indices->null_count();
-    page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
-    page_statistics_->IncrementNumValues(non_null_count);
-    page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
+    if (page_statistics_ != nullptr) {
+      int64_t non_null_count = chunk_indices->length() - 
chunk_indices->null_count();
+      page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
+      page_statistics_->IncrementNumValues(non_null_count);
+      page_statistics_->Update(*referenced_dictionary, 
/*update_counts=*/false);
+    }
+    if (bloom_filter_ != nullptr) {
+      UpdateBloomFilterArray(*referenced_dictionary);

Review Comment:
   It seems that we don't have to use `referenced_dictionary`. If we can accept 
the bloom filter to contain more values than it should have, 
`data.dictionary()` seems to be sufficient.
   
   BTW, if we already have dictionary encoding, should we simply disable 
building bloom filter in this case?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace internal {

Review Comment:
   If this file is for internal use, then we should rename the file as 
`bloom_filter_builder_internal.h` or simply `bloom_filter_internal.h`. Then 
this file will not be installed publicly.



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;

Review Comment:
   Move these forward declarations to parquet/type_fwd.h?



##########
cpp/src/parquet/metadata.h:
##########
@@ -499,21 +498,43 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// Alias type of page index location of a row group. The index location
+/// is located by column ordinal. If a column does not have a page index,
+/// its value is set to std::nullopt.
+using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+
+/// Alias type of bloom filter location of a row group. The filter location
+/// is located by column ordinal.
+///
+/// Number of columns with a bloom filter to be relatively small compared to
+/// the number of overall columns, so map is used.
+using RowGroupBloomFilterLocation = std::map<int32_t, IndexLocation>;
+
+/// Alias type of page index and location of a parquet file. The
+/// index location is located by the row group ordinal.
+using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
+
+/// Alias type of bloom filter and location of a parquet file. The
+/// index location is located by the row group ordinal.
+using FileBloomFilterLocation = std::map<size_t, RowGroupBloomFilterLocation>;
+
 /// \brief Public struct for location to all page indexes in a parquet file.
 struct PageIndexLocation {
-  /// Alias type of page index location of a row group. The index location
-  /// is located by column ordinal. If the column does not have the page index,
-  /// its value is set to std::nullopt.
-  using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
-  /// Alias type of page index location of a parquet file. The index location
-  /// is located by the row group ordinal.
-  using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
   /// Row group column index locations which uses row group ordinal as the key.
   FileIndexLocation column_index_location;
   /// Row group offset index locations which uses row group ordinal as the key.
   FileIndexLocation offset_index_location;
 };
 
+/// \brief Public struct for location to all bloom filters in a parquet file.
+struct BloomFilterLocation {
+  /// Row group bloom filter index locations which uses row group ordinal as 
the key.
+  ///
+  /// Note: Before Parquet 2.10, the bloom filter index only have "offset". 
But here

Review Comment:
   ```suggestion
     /// Note: Before Parquet Format v2.10, the bloom filter index only have 
"offset". But here
   ```



##########
cpp/src/parquet/metadata.h:
##########
@@ -499,21 +498,43 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// Alias type of page index location of a row group. The index location
+/// is located by column ordinal. If a column does not have a page index,
+/// its value is set to std::nullopt.
+using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+
+/// Alias type of bloom filter location of a row group. The filter location
+/// is located by column ordinal.
+///
+/// Number of columns with a bloom filter to be relatively small compared to
+/// the number of overall columns, so map is used.
+using RowGroupBloomFilterLocation = std::map<int32_t, IndexLocation>;

Review Comment:
   What about defining `RowGroupBloomFilterLocation` and 
`FileBloomFilterLocation` in the `BloomFilterLocation`?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1719,6 +1720,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<internal::BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_{nullptr};
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  auto path = this->schema_.Column(0)->path();
+  if (column_properties.bloom_filter_enabled()) {
+    wp_builder.enable_bloom_filter_options(
+        column_properties.bloom_filter_options().value(), path);
+  } else {
+    wp_builder.disable_bloom_filter(path);
+  }
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ =
+      internal::BloomFilterBuilder::Make(&this->schema_, 
this->writer_properties_.get());
+  // Initial RowGroup

Review Comment:
   Did you mean initialize?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2475,32 +2651,37 @@ std::shared_ptr<ColumnWriter> 
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
     encoding = properties->dictionary_index_encoding();
   }
   switch (descr->physical_type()) {
-    case Type::BOOLEAN:
+    case Type::BOOLEAN: {
+      if (bloom_filter != nullptr) {
+        throw ParquetException("Bloom filter is not supported for boolean 
type");

Review Comment:
   Should we throw in the constructor of `TypedColumnWriterImpl<BooleanType>` 
instead to make here looks better?



##########
cpp/src/parquet/properties.h:
##########
@@ -145,6 +145,15 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::UNKNOWN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+struct PARQUET_EXPORT BloomFilterOptions {

Review Comment:
   IMO, it is fine to provide a default FPP but users are rarely expected to 
use the default NDV.



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1719,6 +1720,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<internal::BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_{nullptr};
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  auto path = this->schema_.Column(0)->path();
+  if (column_properties.bloom_filter_enabled()) {
+    wp_builder.enable_bloom_filter_options(
+        column_properties.bloom_filter_options().value(), path);
+  } else {
+    wp_builder.disable_bloom_filter(path);
+  }
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ =
+      internal::BloomFilterBuilder::Make(&this->schema_, 
this->writer_properties_.get());
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0);
+  std::shared_ptr<ColumnWriter> writer =
+      ColumnWriter::Make(this->metadata_.get(), std::move(pager),
+                         this->writer_properties_.get(), bloom_filter_);
+  return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+}
+
+// Note: BooleanType is Excluded.
+using TestBloomFilterTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, 
FloatType,
+                                              DoubleType, ByteArrayType, 
FLBAType>;
+
+TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes);
+
+TYPED_TEST(TestBloomFilterWriter, Basic) {
+  this->GenerateData(SMALL_SIZE);
+  ColumnProperties column_properties;
+  BloomFilterOptions options;
+  options.ndv = SMALL_SIZE;
+  column_properties.set_bloom_filter_options(options);
+
+  auto writer = this->BuildWriterWithBloomFilter(SMALL_SIZE, 
column_properties);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
+  writer->Close();
+
+  // Read all rows so we are sure that also the non-dictionary pages are read 
correctly

Review Comment:
   ```suggestion
     // Make sure that column values are read correctly
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to