pitrou commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r1311657835


##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,

Review Comment:
   `arrow_reader_writer_test.cc` is already too large and slow to compile, we 
should instead create a new test file for this and the page index test.



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}

Review Comment:
   Can you add declarations to `parquet/type_fwd.h` instead?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(bloom_filter, nullptr);
+        } else {
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void verifyBloomFilter(const BloomFilter* bloom_filter,

Review Comment:
   Please use CamelCase for function names.



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(bloom_filter, nullptr);
+        } else {
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void verifyBloomFilter(const BloomFilter* bloom_filter,
+                         const ::arrow::ChunkedArray& chunked_array) {
+    auto iter = ::arrow::stl::Begin<ArrowType>(chunked_array);
+    auto end = ::arrow::stl::End<ArrowType>(chunked_array);
+    while (iter != end) {
+      auto value = *iter;
+      if (value == std::nullopt) {
+        ++iter;
+        continue;
+      }
+      if constexpr (std::is_same_v<ArrowType, ::arrow::StringType>) {
+        ByteArray ba(value.value());
+        EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));

Review Comment:
   Perhaps you can instead add a `BloomFilter::Hash(std::string_view)` 
convenience method?



##########
cpp/src/parquet/properties.h:
##########
@@ -139,6 +139,18 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::PLAIN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+/// Note: Currently we don't support bloom filter for boolean columns,
+/// so if enable bloom filter for boolean columns, it will be ignored.
+struct BloomFilterOptions {
+  int32_t ndv = DEFAULT_BLOOM_FILTER_NDV;
+  double fpp = DEFAULT_BLOOM_FILTER_FPP;

Review Comment:
   Can you describe each of these values?



##########
cpp/src/parquet/properties.h:
##########
@@ -139,6 +139,18 @@ static constexpr Encoding::type DEFAULT_ENCODING = 
Encoding::PLAIN;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = 
Compression::UNCOMPRESSED;
 static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
+static constexpr int32_t DEFAULT_BLOOM_FILTER_NDV = 1024 * 1024;
+static constexpr double DEFAULT_BLOOM_FILTER_FPP = 0.05;
+
+/// Note: Currently we don't support bloom filter for boolean columns,
+/// so if enable bloom filter for boolean columns, it will be ignored.

Review Comment:
   Can you move this comment elsewhere? It's not a description of 
`BloomFilterOptions`



##########
cpp/src/parquet/metadata.h:
##########
@@ -508,21 +507,32 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// Alias type of page index and bloom filter location of a row group. The 
index
+/// location is located by column ordinal. If the column does not have the 
index,
+/// its value is set to std::nullopt.

Review Comment:
   ```suggestion
   /// location is located by column ordinal. If a column does not have a page 
index
   /// (respectively bloom filter), its value is set to std::nullopt.
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5584,5 +5592,104 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
                             /*null_counts=*/{0}}));
 }
 
+class ParquetBloomFilterRoundTripTest : public ::testing::Test,
+                                        public ParquetIndexRoundTripTest {
+ public:
+  void ReadBloomFilters(int expect_num_row_groups,
+                        const std::set<int>& expect_columns_without_filter = 
{}) {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto& bloom_filter_reader = reader->GetBloomFilterReader();
+
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_reader = bloom_filter_reader.RowGroup(rg);
+      ASSERT_NE(row_group_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        bool expect_no_bloom_filter = expect_columns_without_filter.find(col) 
!=
+                                      expect_columns_without_filter.cend();
+
+        auto bloom_filter = row_group_reader->GetColumnBloomFilter(col);
+        if (expect_no_bloom_filter) {
+          ASSERT_EQ(bloom_filter, nullptr);
+        } else {
+          bloom_filters_.push_back(std::move(bloom_filter));
+        }
+      }
+    }
+  }
+
+  template <typename ArrowType>
+  void verifyBloomFilter(const BloomFilter* bloom_filter,
+                         const ::arrow::ChunkedArray& chunked_array) {
+    auto iter = ::arrow::stl::Begin<ArrowType>(chunked_array);
+    auto end = ::arrow::stl::End<ArrowType>(chunked_array);
+    while (iter != end) {
+      auto value = *iter;

Review Comment:
   I think this can simply be:
   ```suggestion
       for (auto value : ::arrow::stl::Iterate<ArrowType>(chunked_array)) {
   ```



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {

Review Comment:
   Should this be `BloomFilterWriter` instead?



##########
cpp/src/parquet/properties.h:
##########
@@ -532,6 +571,50 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_statistics(path->ToDotString());
     }
 
+    /// Enable writing bloom filter in general for all columns. Default 
disabled.
+    ///
+    /// Please check the link below for more details:
+    /// https://github.com/apache/parquet-format/blob/master/BloomFilter.md
+    Builder* enable_bloom_filter() {
+      default_column_properties_.set_bloom_filter_enabled(true);
+      return this;
+    }
+
+    /// Enable bloom filter for the column specified by `path`.
+    /// Default disabled.
+    Builder* enable_bloom_filter(const std::string& path) {
+      auto iter = bloom_filter_options_.find(path);
+      if (iter == bloom_filter_options_.end() || iter->second == std::nullopt) 
{
+        bloom_filter_options_[path] = BloomFilterOptions();

Review Comment:
   I don't understand: you can change the default bloom filter options for all 
columns in `set_bloom_filter_options`, but if you enable bloom filter for one 
column, then it gets the default ndv/fpp values above?



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1793,36 +1793,65 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
   }
 
   void SetPageIndexLocation(const PageIndexLocation& location) {
-    auto set_index_location =
+    auto set_index_location = [this](size_t row_group_ordinal,
+                                     const FileIndexLocation& 
file_index_location,
+                                     bool column_index) {
+      auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+      auto iter = file_index_location.find(row_group_ordinal);
+      if (iter != file_index_location.cend()) {
+        const auto& row_group_index_location = iter->second;
+        for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          if (i >= row_group_metadata.columns.size()) {
+            throw ParquetException("Cannot find metadata for column ordinal ", 
i);
+          }
+          auto& column_metadata = row_group_metadata.columns.at(i);
+          const auto& index_location = row_group_index_location.at(i);
+          if (index_location.has_value()) {
+            if (column_index) {
+              
column_metadata.__set_column_index_offset(index_location->offset);
+              
column_metadata.__set_column_index_length(index_location->length);
+            } else {
+              
column_metadata.__set_offset_index_offset(index_location->offset);
+              
column_metadata.__set_offset_index_length(index_location->length);
+            }
+          }
+        }
+      }
+    };
+
+    for (size_t i = 0; i < row_groups_.size(); ++i) {
+      set_index_location(i, location.column_index_location, true);
+      set_index_location(i, location.offset_index_location, false);
+    }
+  }
+
+  // Update location to all bloom filters in the parquet file.
+  void SetBloomFilterLocation(const BloomFilterLocation& location) {
+    auto set_bloom_filter_location =
         [this](size_t row_group_ordinal,
-               const PageIndexLocation::FileIndexLocation& file_index_location,
-               bool column_index) {
+               const FileIndexLocation& file_bloom_filter_location) {
           auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
-          auto iter = file_index_location.find(row_group_ordinal);
-          if (iter != file_index_location.cend()) {
-            const auto& row_group_index_location = iter->second;
-            for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          auto iter = file_bloom_filter_location.find(row_group_ordinal);
+          if (iter != file_bloom_filter_location.cend()) {
+            const auto& row_group_bloom_filter_location = iter->second;
+            for (size_t i = 0; i < row_group_bloom_filter_location.size(); 
++i) {
               if (i >= row_group_metadata.columns.size()) {

Review Comment:
   Why would this happen?



##########
cpp/src/parquet/metadata.cc:
##########
@@ -1793,36 +1793,65 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
   }
 
   void SetPageIndexLocation(const PageIndexLocation& location) {
-    auto set_index_location =
+    auto set_index_location = [this](size_t row_group_ordinal,
+                                     const FileIndexLocation& 
file_index_location,
+                                     bool column_index) {
+      auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+      auto iter = file_index_location.find(row_group_ordinal);
+      if (iter != file_index_location.cend()) {
+        const auto& row_group_index_location = iter->second;
+        for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          if (i >= row_group_metadata.columns.size()) {
+            throw ParquetException("Cannot find metadata for column ordinal ", 
i);
+          }
+          auto& column_metadata = row_group_metadata.columns.at(i);
+          const auto& index_location = row_group_index_location.at(i);
+          if (index_location.has_value()) {
+            if (column_index) {
+              
column_metadata.__set_column_index_offset(index_location->offset);
+              
column_metadata.__set_column_index_length(index_location->length);
+            } else {
+              
column_metadata.__set_offset_index_offset(index_location->offset);
+              
column_metadata.__set_offset_index_length(index_location->length);
+            }
+          }
+        }
+      }
+    };
+
+    for (size_t i = 0; i < row_groups_.size(); ++i) {
+      set_index_location(i, location.column_index_location, true);
+      set_index_location(i, location.offset_index_location, false);
+    }
+  }
+
+  // Update location to all bloom filters in the parquet file.
+  void SetBloomFilterLocation(const BloomFilterLocation& location) {
+    auto set_bloom_filter_location =
         [this](size_t row_group_ordinal,
-               const PageIndexLocation::FileIndexLocation& file_index_location,
-               bool column_index) {
+               const FileIndexLocation& file_bloom_filter_location) {
           auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
-          auto iter = file_index_location.find(row_group_ordinal);
-          if (iter != file_index_location.cend()) {
-            const auto& row_group_index_location = iter->second;
-            for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+          auto iter = file_bloom_filter_location.find(row_group_ordinal);
+          if (iter != file_bloom_filter_location.cend()) {
+            const auto& row_group_bloom_filter_location = iter->second;
+            for (size_t i = 0; i < row_group_bloom_filter_location.size(); 
++i) {
               if (i >= row_group_metadata.columns.size()) {
                 throw ParquetException("Cannot find metadata for column 
ordinal ", i);
               }
-              auto& column_metadata = row_group_metadata.columns.at(i);
-              const auto& index_location = row_group_index_location.at(i);
-              if (index_location.has_value()) {
-                if (column_index) {
-                  
column_metadata.__set_column_index_offset(index_location->offset);
-                  
column_metadata.__set_column_index_length(index_location->length);
-                } else {
-                  
column_metadata.__set_offset_index_offset(index_location->offset);
-                  
column_metadata.__set_offset_index_length(index_location->length);
-                }
+              auto& column = row_group_metadata.columns.at(i);
+              auto& column_metadata = column.meta_data;
+              const auto& bloom_filter_location = 
row_group_bloom_filter_location.at(i);
+              if (bloom_filter_location.has_value()) {
+                
column_metadata.__set_bloom_filter_offset(bloom_filter_location->offset);
+                // TODO(mwish): Allow this in Parquet 2.10.

Review Comment:
   Why not write this always? It shouldn't hurt anyway.



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -373,7 +375,6 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
 
   const ColumnDescriptor* descr_;
 
- private:

Review Comment:
   Mark them `protected` if you just want to inherit them?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);
+    bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    for (int64_t i = 0; i < num_values; ++i) {

Review Comment:
   Why not use a `std::array` here too?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1591,5 +1592,88 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties());
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  wp_builder.max_statistics_size(column_properties.max_statistics_size());
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ = BloomFilterBuilder::Make(&this->schema_, 
*this->writer_properties_);
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  BloomFilterOptions options;
+  options.ndv = static_cast<int32_t>(output_size);
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0, options);

Review Comment:
   Why does this need to be done by hand? Can't `ColumnWriter::Make` take a 
`BloomFilterOptions` instead?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2335,28 +2466,28 @@ std::shared_ptr<ColumnWriter> 
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
       return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);

Review Comment:
   Shouldn't we raise an error if `bloom_filter` is not null here?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2163,6 +2183,9 @@ Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(
 
 // ----------------------------------------------------------------------
 // Write Arrow to BYTE_ARRAY
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+    const ::arrow::Array& values);

Review Comment:
   Why redeclare this method? It's already redeclared above.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2335,28 +2466,28 @@ std::shared_ptr<ColumnWriter> 
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
       return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::INT32:
       return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::INT64:
       return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::INT96:
       return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::FLOAT:
       return std::make_shared<TypedColumnWriterImpl<FloatType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::DOUBLE:
       return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::BYTE_ARRAY:
       return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     case Type::FIXED_LEN_BYTE_ARRAY:
       return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
+          metadata, std::move(pager), use_dictionary, encoding, properties, 
bloom_filter);
     default:
       ParquetException::NYI("type reader not implemented");

Review Comment:
   Unrelated, but let's fix this error message?
   ```suggestion
         ParquetException::NYI("column writer not implemented for this Parquet 
type");
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);
+    bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      bloom_filter_->InsertHash(bloom_filter_->Hash(values + i, 
descr_->type_length()));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(bloom_filter_->Hash(values + i + 
position));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(
+                bloom_filter_->Hash(values + i + position, 
descr_->type_length()));
+          }
+        });
+  }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& 
array) {
+  PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+      *array.data(),
+      [&](const std::string_view& view) {
+        const ByteArray value{view};
+        bloom_filter->InsertHash(bloom_filter->Hash(&value));
+        return Status::OK();
+      },
+      []() { return Status::OK(); }));
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterArray(const 
::arrow::Array& values) {
+  // Only ByteArray type would write ::arrow::Array directly.
+  ParquetException::NYI("Unreachable");
+}
+
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+    const ::arrow::Array& values) {
+  if (bloom_filter_) {
+    if (!::arrow::is_base_binary_like(values.type_id())) {
+      throw ParquetException("Only BaseBinaryArray and subclasses supported");
+    }
+    if (::arrow::is_dictionary(values.type_id())) {
+      // Dictionary is handled in WriteArrowDense, so this should never happen.
+      throw ParquetException("UpdateBloomFilterArray not support dictionary");
+    }
+
+    if (::arrow::is_binary_like(values.type_id())) {
+      UpdateBinaryBloomFilter(bloom_filter_,
+                              checked_cast<const 
::arrow::BinaryArray&>(values));
+    } else {
+      DCHECK(::arrow::is_large_binary_like(values.type_id()));
+      UpdateBinaryBloomFilter(bloom_filter_,
+                              checked_cast<const 
::arrow::LargeBinaryArray&>(values));
+    }
+  }
+}
+

Review Comment:
   Can we put `ColumnWriterImpl` and `TypedColumnWriterImpl` in the anonymous 
namespace?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);
+    bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      bloom_filter_->InsertHash(bloom_filter_->Hash(values + i, 
descr_->type_length()));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(bloom_filter_->Hash(values + i + 
position));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(
+                bloom_filter_->Hash(values + i + position, 
descr_->type_length()));
+          }
+        });
+  }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& 
array) {
+  PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+      *array.data(),
+      [&](const std::string_view& view) {

Review Comment:
   Perhaps instead use `VisitSetBitRunsVoid` on the null bitmap, in order to 
batch hashes?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);
+    bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      bloom_filter_->InsertHash(bloom_filter_->Hash(values + i, 
descr_->type_length()));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {

Review Comment:
   Also use a `std::array` here and below?



##########
cpp/src/parquet/column_writer.h:
##########
@@ -44,6 +44,7 @@ class CodecOptions;
 namespace parquet {
 
 struct ArrowWriteContext;
+class BloomFilter;

Review Comment:
   Here as well, use `type_fwd.h`.



##########
cpp/src/parquet/bloom_filter_parquet_test.cc:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/testing/gtest_util.h>
+#include <gtest/gtest.h>
+
+#include "parquet/bloom_filter.h"
+#include "parquet/bloom_filter_builder.h"
+#include "parquet/bloom_filter_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/test_util.h"
+
+namespace parquet::test {
+
+TEST(BloomFilterReader, ReadBloomFilter) {
+  std::string dir_string(parquet::test::get_data_dir());
+  std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_NE(nullptr, bloom_filter);
+  EXPECT_THROW(row_group_0->GetColumnBloomFilter(1), ParquetException);
+
+  // assert exists
+  {
+    std::string_view sv = "Hello";
+    ByteArray ba{sv};
+    EXPECT_TRUE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+  }
+
+  // no exists
+  {
+    std::string_view sv = "NOT_EXISTS";
+    ByteArray ba{sv};
+    EXPECT_FALSE(bloom_filter->FindHash(bloom_filter->Hash(&ba)));
+  }
+}
+
+TEST(BloomFilterReader, FileNotHaveBloomFilter) {
+  // Can still get a BloomFilterReader and a RowGroupBloomFilter
+  // reader, but cannot get a non-null BloomFilter.
+  std::string dir_string(parquet::test::get_data_dir());
+  std::string path = dir_string + "/alltypes_plain.parquet";
+  auto reader = ParquetFileReader::OpenFile(path, false);
+  auto file_metadata = reader->metadata();
+  EXPECT_FALSE(file_metadata->is_encryption_algorithm_set());
+  auto& bloom_filter_reader = reader->GetBloomFilterReader();
+  auto row_group_0 = bloom_filter_reader.RowGroup(0);
+  ASSERT_NE(nullptr, row_group_0);
+  EXPECT_THROW(bloom_filter_reader.RowGroup(1), ParquetException);
+  auto bloom_filter = row_group_0->GetColumnBloomFilter(0);
+  ASSERT_EQ(nullptr, bloom_filter);
+}
+
+// <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter.
+TEST(BloomFilterBuilderTest, BasicRoundTrip) {
+  SchemaDescriptor schema;
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::ByteArray("c2")});
+  schema.Init(root);
+  auto writer_properties = default_writer_properties();
+  auto builder = BloomFilterBuilder::Make(&schema, *writer_properties);
+  builder->AppendRowGroup();
+  BloomFilterOptions bloom_filter_options;
+  bloom_filter_options.ndv = 100;
+  auto bloom_filter = builder->GetOrCreateBloomFilter(0, bloom_filter_options);

Review Comment:
   Why does `GetOrCreateBloomFilter` take a `BloomFilterOptions` if 
`BloomFilterBuilder::Make` also takes a `WriterProperties`?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,119 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);
+    bloom_filter_->Hashes(values, static_cast<int>(num_values), hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), static_cast<int>(num_values));
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      bloom_filter_->InsertHash(bloom_filter_->Hash(values + i, 
descr_->type_length()));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(bloom_filter_->Hash(values + i + 
position));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            bloom_filter_->InsertHash(
+                bloom_filter_->Hash(values + i + position, 
descr_->type_length()));
+          }
+        });
+  }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& 
array) {
+  PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+      *array.data(),
+      [&](const std::string_view& view) {
+        const ByteArray value{view};
+        bloom_filter->InsertHash(bloom_filter->Hash(&value));
+        return Status::OK();
+      },
+      []() { return Status::OK(); }));
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterArray(const 
::arrow::Array& values) {
+  // Only ByteArray type would write ::arrow::Array directly.
+  ParquetException::NYI("Unreachable");
+}
+
+template <>
+void TypedColumnWriterImpl<ByteArrayType>::UpdateBloomFilterArray(
+    const ::arrow::Array& values) {
+  if (bloom_filter_) {
+    if (!::arrow::is_base_binary_like(values.type_id())) {
+      throw ParquetException("Only BaseBinaryArray and subclasses supported");
+    }
+    if (::arrow::is_dictionary(values.type_id())) {

Review Comment:
   This is already excluded by `is_base_binary_like`.



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1591,5 +1592,88 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties());
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  wp_builder.max_statistics_size(column_properties.max_statistics_size());
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ = BloomFilterBuilder::Make(&this->schema_, 
*this->writer_properties_);
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  BloomFilterOptions options;
+  options.ndv = static_cast<int32_t>(output_size);
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0, options);
+  std::shared_ptr<ColumnWriter> writer =
+      ColumnWriter::Make(this->metadata_.get(), std::move(pager),
+                         this->writer_properties_.get(), bloom_filter_);
+  return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+}
+
+// Note: BooleanType is Excluded.
+using TestBloomFilterTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, 
FloatType,
+                                              DoubleType, ByteArrayType, 
FLBAType>;
+
+TYPED_TEST_SUITE(TestBloomFilterWriter, TestBloomFilterTypes);
+
+TYPED_TEST(TestBloomFilterWriter, Basic) {
+  this->GenerateData(SMALL_SIZE);
+  ColumnProperties column_properties;
+  column_properties.set_bloom_filter_enabled(true);
+
+  auto writer = this->BuildWriterWithBloomFilter(SMALL_SIZE, 
column_properties);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
+  writer->Close();
+
+  // Read all rows so we are sure that also the non-dictionary pages are read 
correctly
+  this->SetupValuesOut(SMALL_SIZE);
+  this->ReadColumnFully();
+  ASSERT_EQ(SMALL_SIZE, this->values_read_);
+  this->values_.resize(SMALL_SIZE);

Review Comment:
   Hmm, why is this needed?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.

Review Comment:
   This comment is off-topic for this file, right?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API convenience to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties& 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the BloomFilter from column ordinal.
+  ///
+  /// \param column_ordinal Column ordinal in schema, which is only for leaf 
columns.
+  /// \param bloom_filter_options The options(like num distinct values and 
false positive
+  /// rate) to create a BloomFilter.
+  ///
+  /// \return BloomFilter for the column and its memory ownership belongs to 
the
+  /// BloomFilterBuilder.
+  virtual BloomFilter* GetOrCreateBloomFilter(
+      int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) 
= 0;
+
+  /// \brief Write the bloom filter to sink.
+  ///
+  /// \param[out] sink The output stream to write the bloom filter.
+  /// \param[out] location The location of all bloom filter to the start of 
sink.
+  virtual void WriteTo(::arrow::io::OutputStream* sink,
+                       BloomFilterLocation* location) = 0;
+
+  /// \brief Complete the bloom filter builder and no more write is allowed.

Review Comment:
   This is confusing, as "WriteTo" must be called after "Finish" but this 
docstring says otherwise.



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"

Review Comment:
   ```suggestion
   #include "arrow/io/type_fwd.h"
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(
+      int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) 
override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  void Finish() override { finished_ = true; }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (row_group_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder not supports Boolean.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  // vector: row_group_ordinal
+  // map: column_ordinal -> bloom filter
+  std::vector<std::map<int32_t, std::unique_ptr<BloomFilter>>> 
row_group_bloom_filters_;

Review Comment:
   Why is this using `vector<map<...>>` while `FileIndexLocation` prefers to 
use `map<vector<...>>`? Is there a particular reason or is it just random?



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(
+      int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) 
override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  void Finish() override { finished_ = true; }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (row_group_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder not supports Boolean.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  // vector: row_group_ordinal
+  // map: column_ordinal -> bloom filter
+  std::vector<std::map<int32_t, std::unique_ptr<BloomFilter>>> 
row_group_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::unique_ptr<BloomFilterBuilder>(
+      new BloomFilterBuilderImpl(schema, properties));
+}
+
+void BloomFilterBuilderImpl::AppendRowGroup() { 
row_group_bloom_filters_.emplace_back(); }
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(
+    int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) {
+  CheckState(column_ordinal);
+  std::unique_ptr<BloomFilter>& bloom_filter =
+      row_group_bloom_filters_.back()[column_ordinal];
+  if (bloom_filter == nullptr) {
+    auto block_split_bloom_filter =
+        std::make_unique<BlockSplitBloomFilter>(properties_.memory_pool());
+    block_split_bloom_filter->Init(BlockSplitBloomFilter::OptimalNumOfBytes(
+        bloom_filter_options.ndv, bloom_filter_options.fpp));
+    bloom_filter = std::move(block_split_bloom_filter);
+  }
+  return bloom_filter.get();
+}
+
+void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
+                                     BloomFilterLocation* location) {
+  if (!finished_) {
+    throw ParquetException("Cannot call WriteTo() to unfinished 
PageIndexBuilder.");
+  }
+  if (row_group_bloom_filters_.empty()) {
+    // Return quickly if there is no bloom filter
+    return;
+  }
+
+  for (size_t row_group_ordinal = 0; row_group_ordinal < 
row_group_bloom_filters_.size();
+       ++row_group_ordinal) {
+    const auto& row_group_bloom_filters = 
row_group_bloom_filters_[row_group_ordinal];
+    // the whole row group has no bloom filter
+    if (row_group_bloom_filters.empty()) {
+      continue;
+    }
+    bool has_valid_bloom_filter = false;
+    int num_columns = schema_->num_columns();
+    std::vector<std::optional<IndexLocation>> locations(num_columns, 
std::nullopt);
+
+    // serialize bloom filter by ascending order of column id
+    for (int32_t column_id = 0; column_id < num_columns; ++column_id) {

Review Comment:
   This is wasteful. Iterating the map will give you the keys (column ordinals) 
in sorted order.



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  WriterProperties properties)
+      : schema_(schema), properties_(std::move(properties)) {}
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(
+      int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) 
override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. Column encryption is not implemented yet. The side effect is 
that it
+  /// deletes all bloom filters after they have been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+  void Finish() override { finished_ = true; }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (row_group_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder not supports Boolean.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  WriterProperties properties_;
+  bool finished_ = false;
+
+  // vector: row_group_ordinal
+  // map: column_ordinal -> bloom filter
+  std::vector<std::map<int32_t, std::unique_ptr<BloomFilter>>> 
row_group_bloom_filters_;
+};
+
+std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
+    const SchemaDescriptor* schema, const WriterProperties& properties) {
+  return std::unique_ptr<BloomFilterBuilder>(

Review Comment:
   Can probably use `std::make_unique<BloomFilterBuilderImpl>` here?



##########
cpp/src/parquet/metadata.h:
##########
@@ -508,21 +507,32 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// Alias type of page index and bloom filter location of a row group. The 
index
+/// location is located by column ordinal. If the column does not have the 
index,
+/// its value is set to std::nullopt.
+using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+
+/// Alias type of page index and bloom filter location of a parquet file. The
+/// index location is located by the row group ordinal.
+using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
+
 /// \brief Public struct for location to all page indexes in a parquet file.
 struct PageIndexLocation {
-  /// Alias type of page index location of a row group. The index location
-  /// is located by column ordinal. If the column does not have the page index,
-  /// its value is set to std::nullopt.
-  using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
-  /// Alias type of page index location of a parquet file. The index location
-  /// is located by the row group ordinal.
-  using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
   /// Row group column index locations which uses row group ordinal as the key.
   FileIndexLocation column_index_location;
   /// Row group offset index locations which uses row group ordinal as the key.
   FileIndexLocation offset_index_location;
 };
 
+/// \brief Public struct for location to all bloom filters in a parquet file.
+struct BloomFilterLocation {
+  /// Row group bloom filter index locations which uses row group ordinal as 
the key.
+  ///
+  /// Note: Before Parquet 2.10, the bloom filter index only have "offset". 
But here
+  /// we use "IndexLocation" with length to support the future extension.
+  FileIndexLocation bloom_filter_location;

Review Comment:
   (why std::map instead of std::unordered_map btw?)



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -167,6 +167,12 @@ class PARQUET_EXPORT BloomFilter {
 
   virtual ~BloomFilter() = default;
 
+  // Variant of const pointer argument to facilitate template

Review Comment:
   +1 for the const-ref switch. We can keep the pointer versions for 
compatibility as well (but please don't add a gratuitous `type_len` argument).



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#pragma once
+
+#include "arrow/io/interfaces.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class BloomFilter;
+class SchemaDescriptor;
+struct BloomFilterOptions;
+struct BloomFilterLocation;
+
+namespace schema {
+class ColumnPath;
+}
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API convenience to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties& 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the BloomFilter from column ordinal.
+  ///
+  /// \param column_ordinal Column ordinal in schema, which is only for leaf 
columns.
+  /// \param bloom_filter_options The options(like num distinct values and 
false positive
+  /// rate) to create a BloomFilter.
+  ///
+  /// \return BloomFilter for the column and its memory ownership belongs to 
the
+  /// BloomFilterBuilder.
+  virtual BloomFilter* GetOrCreateBloomFilter(
+      int32_t column_ordinal, const BloomFilterOptions& bloom_filter_options) 
= 0;
+
+  /// \brief Write the bloom filter to sink.
+  ///

Review Comment:
   ```suggestion
     /// The bloom filter must have been finished first.
     ///
   ```



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -360,6 +384,10 @@ class FileSerializer : public ParquetFileWriter::Contents {
       }
       row_group_writer_.reset();
 
+      // In Parquet standard, the Bloom filter data can be stored before the 
page indexes
+      // after all row groups or stored between row groups. We choose to store 
it before
+      // the page indexes after all row groups.

Review Comment:
   Ok, but give a rationale for this choice?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1591,5 +1592,88 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties());
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_;
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
+  } else {
+    wp_builder.disable_dictionary();
+    wp_builder.encoding(column_properties.encoding());
+  }
+  wp_builder.max_statistics_size(column_properties.max_statistics_size());
+  this->writer_properties_ = wp_builder.build();
+
+  this->metadata_ =
+      ColumnChunkMetaDataBuilder::Make(this->writer_properties_, this->descr_);
+  std::unique_ptr<PageWriter> pager = PageWriter::Open(
+      this->sink_, column_properties.compression(), this->metadata_.get());
+  builder_ = BloomFilterBuilder::Make(&this->schema_, 
*this->writer_properties_);
+  // Initial RowGroup
+  builder_->AppendRowGroup();
+  BloomFilterOptions options;
+  options.ndv = static_cast<int32_t>(output_size);
+  bloom_filter_ = builder_->GetOrCreateBloomFilter(0, options);

Review Comment:
   Also I would expect `RowGroupWriter::column()` to create the `ColumnWriter` 
with the right bloom filter options given the writer properties.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2319,12 +2343,113 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    // TODO(mwish): Would it allocate too much memory? Would an 
std::array<uint64_t, 64>
+    //  better here?
+    std::vector<uint64_t> hashes(num_values);

Review Comment:
   Yes, that's a good idea. You can probably make it 256 values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to