pitrou commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r2585255072


##########
cpp/src/parquet/metadata.h:
##########
@@ -531,8 +523,8 @@ class PARQUET_EXPORT FileMetaDataBuilder {
   // The prior RowGroupMetaDataBuilder (if any) is destroyed
   RowGroupMetaDataBuilder* AppendRowGroup();
 
-  // Update location to all page indexes in the parquet file
-  void SetPageIndexLocation(const PageIndexLocation& location);

Review Comment:
   This is removing an existing API without even deprecating it. Is nobody 
using it?



##########
cpp/src/parquet/metadata.h:
##########
@@ -505,19 +504,12 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
-/// \brief Public struct for location to all page indexes in a parquet file.
-struct PageIndexLocation {
-  /// Alias type of page index location of a row group. The index location
-  /// is located by column ordinal. If the column does not have the page index,
-  /// its value is set to std::nullopt.
-  using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
-  /// Alias type of page index location of a parquet file. The index location
-  /// is located by the row group ordinal.
-  using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
-  /// Row group column index locations which uses row group ordinal as the key.
-  FileIndexLocation column_index_location;
-  /// Row group offset index locations which uses row group ordinal as the key.
-  FileIndexLocation offset_index_location;
+/// \brief Locations of indexes for all row groups and columns.
+struct PARQUET_EXPORT IndexLocations {
+  enum class IndexType : uint8_t { kColumnIndex, kOffsetIndex, kBloomFilter };
+
+  IndexType type;
+  std::map</*RowGroupId=*/size_t, std::map</*ColumnId=*/size_t, 
IndexLocation>> locations;

Review Comment:
   Can we please remove the horrible `std::map` from these APIs?



##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -408,6 +409,8 @@ add_parquet_test(arrow-reader-writer-test
                  arrow/arrow_statistics_test.cc
                  arrow/variant_test.cc)
 
+add_parquet_test(arrow-parquet-index-test SOURCES 
arrow/arrow_parquet_index_test.cc)

Review Comment:
   Can we just call the source file `index_test.cc`? It's annoying to have 
these `arrow` and `parquet` prefixes.



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1293,6 +1297,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
     current_dict_encoder_ =
         dynamic_cast<DictEncoder<ParquetType>*>(current_encoder_.get());
 
+    bloom_filter_writer_ =
+        std::make_unique<TypedBloomFilterWriter>(metadata->descr(), 
bloom_filter);

Review Comment:
   Why not pass `descr_` here, is it different from `metadata->descr()`?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1976,17 +1990,18 @@ Status 
TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
                                  &exec_ctx));
       referenced_dictionary = referenced_dictionary_datum.make_array();
     }
-
-    int64_t non_null_count = chunk_indices->length() - 
chunk_indices->null_count();
-    page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
-    page_statistics_->IncrementNumValues(non_null_count);
-    page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
-
+    if (page_statistics_ != nullptr) {

Review Comment:
   Why this change? You've just `DCHECK`ed that the pointer was not null.



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer.h"
+
+#include <map>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriter<ParquetType>::BloomFilterWriter(const ColumnDescriptor* 
descr,
+                                                  BloomFilter* bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriter<ParquetType>::bloom_filter_enabled() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const T* values, int64_t 
num_values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if constexpr (std::is_same_v<ParquetType, BooleanType>) {
+    throw ParquetException("Bloom filter is not supported for boolean type");
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - 
i));
+    if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+      bloom_filter_->Hashes(values, descr_->type_length(), batch_size, 
hashes.data());
+    } else {
+      bloom_filter_->Hashes(values, batch_size, hashes.data());
+    }
+    bloom_filter_->InsertHashes(hashes.data(), batch_size);
+  }
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::Update(const bool*, int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, int64_t 
num_values,
+                                                  const uint8_t* valid_bits,
+                                                  int64_t valid_bits_offset) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - 
i));
+          if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  batch_size, hashes.data());
+          } else {
+            bloom_filter_->Hashes(values + i + position, batch_size, 
hashes.data());
+          }
+          bloom_filter_->InsertHashes(hashes.data(), batch_size);
+        }
+      });
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, const 
uint8_t*,
+                                                  int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) {
+  ::arrow::Unreachable("Update for non-ByteArray type should be unreachable");

Review Comment:
   Why? Can you please at least raise a proper exception?



##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/type_fwd.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// \brief Writer for updating a bloom filter with values of a specific 
Parquet type.
+template <typename ParquetType>
+class PARQUET_EXPORT BloomFilterWriter {
+ public:
+  using T = typename ParquetType::c_type;
+
+  /// \param descr The descriptor of the column to write. Must outlive this 
writer.
+  /// \param bloom_filter The bloom filter to update. If nullptr, this writer 
does not
+  /// enable bloom filter. Otherwise, the input bloom filter must outlive this 
writer.
+  BloomFilterWriter(const ColumnDescriptor* descr, BloomFilter* bloom_filter);
+
+  /// \brief Update the bloom filter with typed values.
+  ///
+  /// \param values The values to update the bloom filter with.
+  /// \param num_values The number of values to update the bloom filter with.
+  void Update(const T* values, int64_t num_values);
+
+  /// \brief Update the bloom filter with typed values that have spaces.
+  ///
+  /// \param values The values to update the bloom filter with.
+  /// \param num_values The number of values to update the bloom filter with.
+  /// \param valid_bits The validity bitmap of the values.
+  /// \param valid_bits_offset The offset of the validity bitmap.
+  void UpdateSpaced(const T* values, int64_t num_values, const uint8_t* 
valid_bits,
+                    int64_t valid_bits_offset);
+
+  /// \brief Update the bloom filter with an Arrow array.
+  ///
+  /// \param values The Arrow array to update the bloom filter with.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Check if this writer has enabled the bloom filter.
+  bool bloom_filter_enabled() const;
+
+ private:
+  const ColumnDescriptor* descr_;
+  BloomFilter* bloom_filter_;
+};
+
+/// \brief Interface for building bloom filters of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  virtual ~BloomFilterBuilder() = default;
+
+  /// \brief API to create a BloomFilterBuilder.
+  ///
+  /// \param schema The schema of the file and it must outlive the created 
builder.
+  /// \param properties Properties to get bloom filter options. It must 
outlive the
+  /// created builder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// \brief Start a new row group to write bloom filters, meaning that next 
calls
+  /// to `GetOrCreateBloomFilter` will create bloom filters for the new row 
group.
+  ///
+  /// \throws ParquetException if `WriteTo()` has been called already.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get or create a BloomFilter from the column ordinal of the 
current row group.
+  ///
+  /// \param column_ordinal Column ordinal for the bloom filter.
+  /// \return designated BloomFilter whose ownership belongs to the builder.

Review Comment:
   Can it be null? If so, it should be mentioned in the API doc.



##########
cpp/src/parquet/page_index.h:
##########
@@ -378,9 +378,11 @@ class PARQUET_EXPORT PageIndexBuilder {
   /// are set.
   ///
   /// \param[out] sink The output stream to write the page index.
-  /// \param[out] location The location of all page index to the start of sink.
+  /// \param[out] column_index_location The location of all column indexes.
+  /// \param[out] offset_index_location The location of all offset indexes.

Review Comment:
   Can we return these instead of filling them by pointer?
   Something like:
   ```c++
     struct WriteResult {
       IndexLocations column_index_locations;
       IndexLocations offset_index_locations;
     };
   ```
   



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -2380,5 +2382,86 @@ TYPED_TEST(TestColumnWriterMaxRowsPerPage, 
RequiredLargeChunk) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
+      int64_t output_size, const ColumnProperties& column_properties) {
+    this->sink_ = CreateOutputStream();
+
+    WriterProperties::Builder properties_builder;
+    if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+        column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+      properties_builder.enable_dictionary();
+    } else {
+      properties_builder.disable_dictionary();
+      properties_builder.encoding(column_properties.encoding());
+    }
+    auto path = this->schema_.Column(0)->path();
+    if (column_properties.bloom_filter_enabled()) {
+      
properties_builder.enable_bloom_filter(*column_properties.bloom_filter_options(),
+                                             path);
+    } else {
+      properties_builder.disable_bloom_filter(path);
+    }
+    this->writer_properties_ = properties_builder.build();
+
+    this->metadata_ =
+        ColumnChunkMetaDataBuilder::Make(this->writer_properties_, 
this->descr_);
+    auto pager = PageWriter::Open(this->sink_, column_properties.compression(),
+                                  this->metadata_.get());
+
+    builder_ = BloomFilterBuilder::Make(&this->schema_, 
this->writer_properties_.get());
+    builder_->AppendRowGroup();
+    bloom_filter_ = builder_->GetOrCreateBloomFilter(/*column_ordinal=*/0);
+
+    return std::static_pointer_cast<TypedColumnWriter<TestType>>(

Review Comment:
   Please let's use `checked_pointer_cast` or `dynamic_pointer_cast`?



##########
cpp/src/parquet/metadata.h:
##########
@@ -531,8 +523,8 @@ class PARQUET_EXPORT FileMetaDataBuilder {
   // The prior RowGroupMetaDataBuilder (if any) is destroyed
   RowGroupMetaDataBuilder* AppendRowGroup();
 
-  // Update location to all page indexes in the parquet file
-  void SetPageIndexLocation(const PageIndexLocation& location);
+  // Set locations of all row groups and columns for a specific index type.
+  void SetIndexLocations(const IndexLocations& locations);

Review Comment:
   I would instead recommend something like:
   ```c++
   struct ColumnChunkId {
     int32_t row_group_index;
     int32_t column_index;
   };
   
   using IndexLocations = std::vector<std::pair<ColumnChunkId, IndexLocation>>;
   ```
   
   And then:
   ```c++
     void SetIndexLocations(IndexType, const IndexLocations&);
   ```



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1867,6 +1876,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
         chunk_geospatial_statistics_->Update(values, num_values);
       }
     }
+    bloom_filter_writer_->Update(values, num_values);

Review Comment:
   Sorry, but why do we always write to the bloom filter? What if the bloom 
filter wasn't enabled in `WriterProperties`? Is it just a no-op in that case?



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -2380,5 +2382,86 @@ TYPED_TEST(TestColumnWriterMaxRowsPerPage, 
RequiredLargeChunk) {
   }
 }
 
+template <typename TestType>
+class TestBloomFilterWriter : public TestPrimitiveWriter<TestType> {
+ public:
+  void SetUp() override {
+    TestPrimitiveWriter<TestType>::SetUp();
+    builder_ = nullptr;
+    bloom_filter_ = nullptr;
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
+      int64_t output_size, const ColumnProperties& column_properties) {
+    this->sink_ = CreateOutputStream();
+
+    WriterProperties::Builder properties_builder;
+    if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
+        column_properties.encoding() == Encoding::RLE_DICTIONARY) {
+      properties_builder.enable_dictionary();
+    } else {
+      properties_builder.disable_dictionary();
+      properties_builder.encoding(column_properties.encoding());
+    }
+    auto path = this->schema_.Column(0)->path();
+    if (column_properties.bloom_filter_enabled()) {
+      
properties_builder.enable_bloom_filter(*column_properties.bloom_filter_options(),
+                                             path);
+    } else {
+      properties_builder.disable_bloom_filter(path);
+    }
+    this->writer_properties_ = properties_builder.build();
+
+    this->metadata_ =
+        ColumnChunkMetaDataBuilder::Make(this->writer_properties_, 
this->descr_);
+    auto pager = PageWriter::Open(this->sink_, column_properties.compression(),
+                                  this->metadata_.get());
+
+    builder_ = BloomFilterBuilder::Make(&this->schema_, 
this->writer_properties_.get());
+    builder_->AppendRowGroup();
+    bloom_filter_ = builder_->GetOrCreateBloomFilter(/*column_ordinal=*/0);
+
+    return std::static_pointer_cast<TypedColumnWriter<TestType>>(
+        ColumnWriter::Make(this->metadata_.get(), std::move(pager),
+                           this->writer_properties_.get(), bloom_filter_));
+  }
+
+  std::unique_ptr<BloomFilterBuilder> builder_;
+  BloomFilter* bloom_filter_{nullptr};

Review Comment:
   This is a raw pointer, is it owned by the `BloomFilterBuilder`?



##########
cpp/src/parquet/properties.cc:
##########
@@ -108,6 +108,10 @@ void 
WriterProperties::Builder::CopyColumnSpecificProperties(
     if (col_props.encoding() != default_column_properties_.encoding()) {
       this->encoding(col_path, col_props.encoding());
     }
+
+    if (col_props.bloom_filter_options().has_value()) {
+      this->enable_bloom_filter(col_props.bloom_filter_options().value(), 
col_path);
+    }

Review Comment:
   **Please**, can we be careful when designing APIs?
   
   Other APIs take the `col_path` as first parameter, why does this one take it 
as second parameter?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -2003,7 +2018,7 @@ Status 
TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
                       AddIfNotNull(rep_levels, offset));
     std::shared_ptr<Array> writeable_indices =
         indices->Slice(value_offset, batch_num_spaced_values);
-    if (page_statistics_) {
+    if (page_statistics_ || bloom_filter_writer_->bloom_filter_enabled()) {

Review Comment:
   Why this? Bloom filter does not reuse statistics, does it?



##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/type_fwd.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// \brief Writer for updating a bloom filter with values of a specific 
Parquet type.
+template <typename ParquetType>
+class PARQUET_EXPORT BloomFilterWriter {

Review Comment:
   Can we call this `TypedBloomFilterWriter` like other similar classes?



##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/type_fwd.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// \brief Writer for updating a bloom filter with values of a specific 
Parquet type.
+template <typename ParquetType>
+class PARQUET_EXPORT BloomFilterWriter {
+ public:
+  using T = typename ParquetType::c_type;
+
+  /// \param descr The descriptor of the column to write. Must outlive this 
writer.
+  /// \param bloom_filter The bloom filter to update. If nullptr, this writer 
does not
+  /// enable bloom filter. Otherwise, the input bloom filter must outlive this 
writer.

Review Comment:
   I don't think it makes sense to allow a `nullptr` here. If the user doesn't 
want a bloom filter, they shouldn't create a `BloomFilterWriter`.



##########
cpp/src/parquet/bloom_filter_writer.h:
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/type_fwd.h"
+
+#include "parquet/bloom_filter.h"
+#include "parquet/type_fwd.h"
+
+namespace parquet {
+
+/// \brief Writer for updating a bloom filter with values of a specific 
Parquet type.
+template <typename ParquetType>
+class PARQUET_EXPORT BloomFilterWriter {
+ public:
+  using T = typename ParquetType::c_type;
+
+  /// \param descr The descriptor of the column to write. Must outlive this 
writer.
+  /// \param bloom_filter The bloom filter to update. If nullptr, this writer 
does not
+  /// enable bloom filter. Otherwise, the input bloom filter must outlive this 
writer.
+  BloomFilterWriter(const ColumnDescriptor* descr, BloomFilter* bloom_filter);
+
+  /// \brief Update the bloom filter with typed values.
+  ///
+  /// \param values The values to update the bloom filter with.
+  /// \param num_values The number of values to update the bloom filter with.
+  void Update(const T* values, int64_t num_values);
+
+  /// \brief Update the bloom filter with typed values that have spaces.
+  ///
+  /// \param values The values to update the bloom filter with.
+  /// \param num_values The number of values to update the bloom filter with.
+  /// \param valid_bits The validity bitmap of the values.
+  /// \param valid_bits_offset The offset of the validity bitmap.
+  void UpdateSpaced(const T* values, int64_t num_values, const uint8_t* 
valid_bits,
+                    int64_t valid_bits_offset);
+
+  /// \brief Update the bloom filter with an Arrow array.
+  ///
+  /// \param values The Arrow array to update the bloom filter with.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Check if this writer has enabled the bloom filter.
+  bool bloom_filter_enabled() const;
+
+ private:
+  const ColumnDescriptor* descr_;
+  BloomFilter* bloom_filter_;
+};
+
+/// \brief Interface for building bloom filters of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  virtual ~BloomFilterBuilder() = default;
+
+  /// \brief API to create a BloomFilterBuilder.
+  ///
+  /// \param schema The schema of the file and it must outlive the created 
builder.
+  /// \param properties Properties to get bloom filter options. It must 
outlive the
+  /// created builder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// \brief Start a new row group to write bloom filters, meaning that next 
calls
+  /// to `GetOrCreateBloomFilter` will create bloom filters for the new row 
group.
+  ///
+  /// \throws ParquetException if `WriteTo()` has been called already.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get or create a BloomFilter from the column ordinal of the 
current row group.

Review Comment:
   Why "get or create"? I don't understand why this API is desirable? This is 
adding implementation complexity for no obvious benefit.



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer.h"
+
+#include <map>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriter<ParquetType>::BloomFilterWriter(const ColumnDescriptor* 
descr,
+                                                  BloomFilter* bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriter<ParquetType>::bloom_filter_enabled() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const T* values, int64_t 
num_values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if constexpr (std::is_same_v<ParquetType, BooleanType>) {
+    throw ParquetException("Bloom filter is not supported for boolean type");
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - 
i));
+    if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+      bloom_filter_->Hashes(values, descr_->type_length(), batch_size, 
hashes.data());
+    } else {
+      bloom_filter_->Hashes(values, batch_size, hashes.data());
+    }
+    bloom_filter_->InsertHashes(hashes.data(), batch_size);
+  }
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::Update(const bool*, int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, int64_t 
num_values,
+                                                  const uint8_t* valid_bits,
+                                                  int64_t valid_bits_offset) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - 
i));
+          if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  batch_size, hashes.data());
+          } else {
+            bloom_filter_->Hashes(values + i + position, batch_size, 
hashes.data());
+          }
+          bloom_filter_->InsertHashes(hashes.data(), batch_size);
+        }
+      });
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, const 
uint8_t*,
+                                                  int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) {
+  ::arrow::Unreachable("Update for non-ByteArray type should be unreachable");
+}
+
+namespace {
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& 
array) {
+  // Using a small batch size because an extra `byte_arrays` is used.
+  constexpr int64_t kBinaryHashBatchSize = 64;
+  std::array<ByteArray, kBinaryHashBatchSize> byte_arrays;
+  std::array<uint64_t, kBinaryHashBatchSize> hashes;
+
+  auto batch_insert_hashes = [&](int count) {
+    if (count > 0) {
+      bloom_filter.Hashes(byte_arrays.data(), count, hashes.data());
+      bloom_filter.InsertHashes(hashes.data(), count);
+    }
+  };
+
+  int batch_idx = 0;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      array.null_bitmap_data(), array.offset(), array.length(),
+      [&](int64_t position, int64_t run_length) {
+        for (int64_t i = 0; i < run_length; ++i) {
+          byte_arrays[batch_idx++] = array.GetView(position + i);
+          if (batch_idx == kBinaryHashBatchSize) {
+            batch_insert_hashes(batch_idx);
+            batch_idx = 0;
+          }
+        }
+      });
+  batch_insert_hashes(batch_idx);
+}
+
+}  // namespace
+
+template <>
+void BloomFilterWriter<ByteArrayType>::Update(const ::arrow::Array& values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if (::arrow::is_binary_view_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const 
::arrow::BinaryViewArray&>(values));
+  } else if (::arrow::is_binary_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const ::arrow::BinaryArray&>(values));
+  } else if (::arrow::is_large_binary_like(values.type_id())) {
+    UpdateBinaryBloomFilter(
+        *bloom_filter_,
+        ::arrow::internal::checked_cast<const 
::arrow::LargeBinaryArray&>(values));
+  } else {
+    ParquetException::NYI("Bloom filter is not supported for this Arrow type: 
" +
+                          values.type()->ToString());
+  }
+}
+
+template class BloomFilterWriter<BooleanType>;

Review Comment:
   It's weird to instantiate a class that should not be used.



##########
cpp/src/parquet/bloom_filter_writer.cc:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer.h"
+
+#include <map>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriter<ParquetType>::BloomFilterWriter(const ColumnDescriptor* 
descr,
+                                                  BloomFilter* bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriter<ParquetType>::bloom_filter_enabled() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const T* values, int64_t 
num_values) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  if constexpr (std::is_same_v<ParquetType, BooleanType>) {
+    throw ParquetException("Bloom filter is not supported for boolean type");
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    auto batch_size = static_cast<int>(std::min(kHashBatchSize, num_values - 
i));
+    if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+      bloom_filter_->Hashes(values, descr_->type_length(), batch_size, 
hashes.data());
+    } else {
+      bloom_filter_->Hashes(values, batch_size, hashes.data());
+    }
+    bloom_filter_->InsertHashes(hashes.data(), batch_size);
+  }
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::Update(const bool*, int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::UpdateSpaced(const T* values, int64_t 
num_values,
+                                                  const uint8_t* valid_bits,
+                                                  int64_t valid_bits_offset) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto batch_size = static_cast<int>(std::min(kHashBatchSize, length - 
i));
+          if constexpr (std::is_same_v<ParquetType, FLBAType>) {
+            bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                  batch_size, hashes.data());
+          } else {
+            bloom_filter_->Hashes(values + i + position, batch_size, 
hashes.data());
+          }
+          bloom_filter_->InsertHashes(hashes.data(), batch_size);
+        }
+      });
+}
+
+template <>
+void BloomFilterWriter<BooleanType>::UpdateSpaced(const bool*, int64_t, const 
uint8_t*,
+                                                  int64_t) {
+  if (!bloom_filter_enabled()) {
+    return;
+  }
+  throw ParquetException("Bloom filter is not supported for boolean type");
+}
+
+template <typename ParquetType>
+void BloomFilterWriter<ParquetType>::Update(const ::arrow::Array& values) {
+  ::arrow::Unreachable("Update for non-ByteArray type should be unreachable");
+}
+
+namespace {
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& 
array) {
+  // Using a small batch size because an extra `byte_arrays` is used.
+  constexpr int64_t kBinaryHashBatchSize = 64;
+  std::array<ByteArray, kBinaryHashBatchSize> byte_arrays;
+  std::array<uint64_t, kBinaryHashBatchSize> hashes;
+
+  auto batch_insert_hashes = [&](int count) {
+    if (count > 0) {
+      bloom_filter.Hashes(byte_arrays.data(), count, hashes.data());
+      bloom_filter.InsertHashes(hashes.data(), count);
+    }
+  };
+
+  int batch_idx = 0;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      array.null_bitmap_data(), array.offset(), array.length(),
+      [&](int64_t position, int64_t run_length) {
+        for (int64_t i = 0; i < run_length; ++i) {

Review Comment:
   Why is this using a different approach from `UpdateSpaced`? In 
`UpdateSpaced` you insert each batch even if it's smaller than 
`kHashBatchSize`. Here you are using an additional counter to only commit every 
`kBinaryHashBatchSize`.



##########
cpp/src/parquet/properties.h:
##########
@@ -909,6 +993,19 @@ class PARQUET_EXPORT WriterProperties {
     return false;
   }
 
+  bool bloom_filter_enabled() const {

Review Comment:
   Looks like @wgtmac 's comment above should be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to