wgtmac commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r2015830158


##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet::internal {
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter` for a new row 
group.
+  ///
+  /// \throws ParquetException if WriteTo() has been called to flush bloom 
filters.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the BloomFilter from column ordinal.
+  ///
+  /// \param column_ordinal Column ordinal in schema, which is only for leaf 
columns.
+  ///
+  /// \return BloomFilter for the column and its memory ownership belongs to 
the
+  /// BloomFilterBuilder. It will return nullptr if bloom filter is not 
enabled for the
+  /// column.
+  ///
+  /// \throws ParquetException if any of following conditions applies:
+  /// 1) column_ordinal is out of bound.
+  /// 2) `WriteTo()` has been called already.
+  /// 3) `AppendRowGroup()` is not called before `GetOrCreateBloomFilter()`.
+  virtual BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) = 0;
+
+  /// \brief Write the bloom filter to sink.
+  ///
+  /// The bloom filter cannot be modified after this method is called.
+  ///
+  /// \param[out] sink The output stream to write the bloom filter.

Review Comment:
   sink should be `param[in/out]` I guess?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet::internal {

Review Comment:
   If this is an internal namespace, how about renaming these files to 
`bloom_filter_internal.h/cc` ?



##########
cpp/src/parquet/properties.h:
##########
@@ -235,6 +272,12 @@ class PARQUET_EXPORT ColumnProperties {
 
   bool page_index_enabled() const { return page_index_enabled_; }
 
+  std::optional<BloomFilterOptions> bloom_filter_options() const {
+    return bloom_filter_options_;
+  }
+
+  bool bloom_filter_enabled() const { return bloom_filter_options_ != 
std::nullopt; }

Review Comment:
   ```suggestion
     bool bloom_filter_enabled() const { return 
bloom_filter_options_.has_value(); }
   ```



##########
cpp/src/parquet/type_fwd.h:
##########
@@ -89,6 +89,10 @@ class EncodedStatistics;
 class Statistics;
 struct SizeStatistics;
 
+class BloomFilter;
+struct BloomFilterOptions;
+struct BloomFilterLocation;

Review Comment:
   ```suggestion
   struct BloomFilterLocation;
   struct BloomFilterOptions;
   ```
   
   Sort them alphabetically.



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet::internal {
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.

Review Comment:
   ```suggestion
     /// \brief Append a new row group to host all incoming bloom filters.
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {

Review Comment:
   ```suggestion
       if (finished_) [[unlikely]] {
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {

Review Comment:
   ```suggestion
       if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) 
[[unlikely]] {
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.

Review Comment:
   ```suggestion
     // Using `std::map` because `std::unique_ptr<BloomFilter>` is not copyable.
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2363,12 +2390,153 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                            hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, descr_->type_length(),
+                            static_cast<int>(current_hash_batch_size), 
hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position,
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  DCHECK(bloom_filter_ == nullptr);
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(

Review Comment:
   Same as my comment above, moving all `UpdateBloomFilterXXX` functions to the 
`BloomFilter` class can significantly reduce the changes in this file.



##########
cpp/src/parquet/file_writer.cc:
##########
@@ -91,7 +92,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
                      RowGroupMetaDataBuilder* metadata, int16_t 
row_group_ordinal,
                      const WriterProperties* properties, bool 
buffered_row_group = false,
                      InternalFileEncryptor* file_encryptor = nullptr,
-                     PageIndexBuilder* page_index_builder = nullptr)
+                     PageIndexBuilder* page_index_builder = nullptr,
+                     internal::BloomFilterBuilder* bloom_filter_builder = 
nullptr)

Review Comment:
   The `internal::` prefix looks a little bit ugly. I suggest to rename 
`parquet/bloom_filter_builder.h` to `parquet/bloom_filter_internal.h` but keep 
the `namespace parquet`. WDYT?



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet::internal {
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// Append a new row group to host all incoming bloom filters.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter` for a new row 
group.
+  ///
+  /// \throws ParquetException if WriteTo() has been called to flush bloom 
filters.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the BloomFilter from column ordinal.
+  ///
+  /// \param column_ordinal Column ordinal in schema, which is only for leaf 
columns.

Review Comment:
   Why some comments use `\param` while others use `@param`?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2458,12 +2485,163 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                            hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, descr_->type_length(),
+                            static_cast<int>(current_hash_batch_size), 
hashes.data());
+      bloom_filter_->InsertHashes(hashes.data(),
+                                  static_cast<int>(current_hash_batch_size));
+    }
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) {
+    throw ParquetException("BooleanType does not support bloom filters");
+  }
+}
+
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilterSpaced(const T* values,
+                                                           int64_t num_values,
+                                                           const uint8_t* 
valid_bits,
+                                                           int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position,
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {
+  // BooleanType does not have a bloom filter currently,
+  // so bloom_filter_ should always be nullptr.
+  if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) {
+    throw ParquetException("BooleanType does not support bloom filters");
+  }
+}
+
+template <>
+void TypedColumnWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    ::arrow::internal::VisitSetBitRunsVoid(
+        valid_bits, valid_bits_offset, num_values, [&](int64_t position, 
int64_t length) {
+          for (int64_t i = 0; i < length; i += kHashBatchSize) {
+            auto current_hash_batch_size = std::min(kHashBatchSize, length - 
i);
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  static_cast<int>(current_hash_batch_size),
+                                  hashes.data());
+            bloom_filter_->InsertHashes(hashes.data(),
+                                        
static_cast<int>(current_hash_batch_size));
+          }
+        });
+  }
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter* bloom_filter, const ArrayType& 
array) {
+  // Using a smaller size because an extra `byte_arrays` are used.

Review Comment:
   ```suggestion
     // Using a smaller size because an extra `byte_arrays` is used.
   ```



##########
cpp/src/parquet/bloom_filter_builder.h:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet::internal {
+
+/// \brief Interface for collecting bloom filter of a parquet file.
+///
+/// ```
+/// auto bloom_filter_builder = BloomFilterBuilder::Make(schema, properties);
+/// for (int i = 0; i < num_row_groups; i++) {
+///   bloom_filter_builder->AppendRowGroup();
+///   auto* bloom_filter =
+///   bloom_filter_builder->GetOrCreateBloomFilter(bloom_filter_column);
+///   // Add bloom filter entries in `bloom_filter`.
+///   // ...
+/// }
+/// bloom_filter_builder->WriteTo(sink, location);
+/// ```
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// \brief API to create a BloomFilterBuilder.

Review Comment:
   nit: explain the expecting lifetime of `schema` and `properties`.



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1719,6 +1721,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<internal::BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_{nullptr};
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {
+  this->sink_ = CreateOutputStream();
+  WriterProperties::Builder wp_builder;
+  if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+      column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+    wp_builder.enable_dictionary();
+    wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);

Review Comment:
   Why do we need to set this?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1719,6 +1721,94 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriterWithBloomFilter(
+      int64_t output_size, const ColumnProperties& column_properties);
+
+  std::unique_ptr<internal::BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_{nullptr};
+};
+
+template <typename TestType>
+std::shared_ptr<TypedColumnWriter<TestType>>
+TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
+    int64_t output_size, const ColumnProperties& column_properties) {

Review Comment:
   It looks weird to use `column_properties` as the input parameter but most of 
them are default or unused actually. What about changing it to the below one:
   ```
   TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
       int64_t output_size, bool enable_dictionary, int64_t ndv)
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.

Review Comment:
   ```suggestion
   /// Note: column encryption for bloom filter is not implemented yet.
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {

Review Comment:
   ```suggestion
       if (file_bloom_filters_.empty()) [[unlikely]] {
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1741,15 +1752,22 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
     if (num_values != num_spaced_values) {
       current_value_encoder_->PutSpaced(values, 
static_cast<int>(num_spaced_values),
                                         valid_bits, valid_bits_offset);
+      UpdateBloomFilterSpaced(values, num_spaced_values, valid_bits, 
valid_bits_offset);
     } else {
       current_value_encoder_->Put(values, static_cast<int>(num_values));
+      UpdateBloomFilter(values, num_values);
     }
     if (page_statistics_ != nullptr) {
       page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
                                      num_spaced_values, num_values, num_nulls);
     }
     UpdateUnencodedDataBytes();
   }
+
+  void UpdateBloomFilter(const T* values, int64_t num_values);

Review Comment:
   Does it make sense to move all these `UpdateXXX` to the `BloomFilter` class? 
These functions look more like the detail of bloom filter instead of a column 
writer.



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {

Review Comment:
   ```suggestion
       if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) 
[[unlikely]] {
   ```



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("BloomFilterBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (file_bloom_filters_.empty()) {
+      throw ParquetException("No row group appended to BloomFilterBuilder.");
+    }
+    if (schema_->Column(column_ordinal)->physical_type() == Type::BOOLEAN) {
+      throw ParquetException("BloomFilterBuilder does not support boolean 
type.");
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  const WriterProperties* properties_;
+  bool finished_ = false;
+
+  using RowGroupBloomFilters = std::map<int32_t, std::unique_ptr<BloomFilter>>;
+  // Using unique_ptr because the `std::unique_ptr<BloomFilter>` is not 
copyable.
+  // MSVC has the issue below: https://github.com/microsoft/STL/issues/1036
+  // So we use `std::unique_ptr<std::map<>>` to avoid the issue.
+  std::vector<std::unique_ptr<RowGroupBloomFilters>> file_bloom_filters_;
+};
+
+void BloomFilterBuilderImpl::AppendRowGroup() {
+  if (finished_) {
+    throw ParquetException("Cannot append to a finished BloomFilterBuilder");
+  }
+  file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
+}
+
+BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t 
column_ordinal) {
+  CheckState(column_ordinal);
+  const ColumnDescriptor* column_descr = schema_->Column(column_ordinal);
+  // Bloom filter does not support boolean type, and this should be checked in
+  // CheckState() already.
+  DCHECK_NE(column_descr->physical_type(), Type::BOOLEAN);
+  auto bloom_filter_options_opt = 
properties_->bloom_filter_options(column_descr->path());
+  if (bloom_filter_options_opt == std::nullopt) {
+    return nullptr;
+  }
+  const BloomFilterOptions& bloom_filter_options = *bloom_filter_options_opt;
+  // CheckState() should have checked that file_bloom_filters_ is not empty.

Review Comment:
   I still think that we don't need to be fool-proof here especially when 
`CheckState()` has done duplicate work. Removing these `DCHECK` improves 
readability.



##########
cpp/src/parquet/bloom_filter_builder.cc:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#include "parquet/bloom_filter_builder.h"
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+
+namespace parquet::internal {
+
+namespace {
+/// Column encryption for bloom filter is not implemented yet.
+class BloomFilterBuilderImpl : public BloomFilterBuilder {
+ public:
+  explicit BloomFilterBuilderImpl(const SchemaDescriptor* schema,
+                                  const WriterProperties* properties)
+      : schema_(schema), properties_(properties) {}
+  BloomFilterBuilderImpl(const BloomFilterBuilderImpl&) = delete;
+  BloomFilterBuilderImpl(BloomFilterBuilderImpl&&) = default;
+
+  /// Append a new row group to host all incoming bloom filters.
+  void AppendRowGroup() override;
+
+  BloomFilter* GetOrCreateBloomFilter(int32_t column_ordinal) override;
+
+  /// Serialize all bloom filters with header and bitset in the order of row 
group and
+  /// column id. The side effect is that it deletes all bloom filters after 
they have
+  /// been flushed.
+  void WriteTo(::arrow::io::OutputStream* sink, BloomFilterLocation* location) 
override;
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {

Review Comment:
   `CheckState` might be called for multiple times so adding `[[unlikely]]` is 
something worth doing?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2363,12 +2390,153 @@ Status 
TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
   return Status::OK();
 }
 
+template <typename DType>
+void TypedColumnWriterImpl<DType>::UpdateBloomFilter(const T* values,
+                                                     int64_t num_values) {
+  if (bloom_filter_) {
+    std::array<uint64_t, kHashBatchSize> hashes;
+    for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+      int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - 
i);
+      bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),

Review Comment:
   I just found that parquet-cpp has a lot of `int num_values` used in the 
function signature. Is it worth changing all of them to `int32_t`? cc 
@emkornfield @pitrou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to