pitrou commented on code in PR #37400:
URL: https://github.com/apache/arrow/pull/37400#discussion_r2157348890


##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -372,8 +374,9 @@ set_source_files_properties(public_api_test.cc PROPERTIES 
SKIP_UNITY_BUILD_INCLU
 
 add_parquet_test(internals-test
                  SOURCES
-                 bloom_filter_reader_test.cc
                  bloom_filter_test.cc
+                 bloom_filter_reader_writer_test.cc
+                 encoding_test.cc

Review Comment:
   It seems like this line should be reverted.



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -106,6 +106,34 @@ class PARQUET_EXPORT BloomFilter {
   /// @return hash result.
   virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
 
+  // Variant of const reference argument to facilitate template
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const ByteArray& value) const { return Hash(&value); }
+
+  /// Compute hash for fixed byte array value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  /// @param type_len the value length.
+  uint64_t Hash(const FLBA& value, uint32_t type_len) const {
+    return Hash(&value, type_len);
+  }
+
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(const Int96& value) const { return Hash(&value); }
+
+  /// Compute hash for std::string_view value by using its plain encoding 
result.
+  ///
+  /// @param value the value to hash.
+  uint64_t Hash(std::string_view value) const {
+    ByteArray ba(value);
+    return Hash(&ba);

Review Comment:
   This will stupidly create a local variable and pass a pointer to it.
   Can we do the reverse:
   1) Have a core method `Hash(std::string_view)`
   2) Have other methods (FLBA, ByteArray...) that delegate to it?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -5807,7 +5809,7 @@ auto encode_double = [](double value) {
 
 }  // namespace
 
-class ParquetPageIndexRoundTripTest : public ::testing::Test {
+class TestingWithPageIndex {

Review Comment:
   Can we move the page index and bloom filter tests to another test file? This 
file is already much too long (and takes ages to compile).



##########
cpp/src/parquet/bloom_filter_writer_internal.h:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "parquet/bloom_filter.h"
+
+namespace arrow {

Review Comment:
   Can you include the respective `type_fwd.h` headers?



##########
cpp/src/parquet/properties.h:
##########
@@ -658,6 +702,43 @@ class PARQUET_EXPORT WriterProperties {
       return this->disable_statistics(path->ToDotString());
     }
 
+    /// Disable bloom filter for the column specified by `path`.
+    /// Default disabled.
+    Builder* disable_bloom_filter(const std::string& path) {
+      bloom_filter_options_[path] = std::nullopt;

Review Comment:
   Why not simply remove the entry?



##########
cpp/src/parquet/bloom_filter_writer_internal.h:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "parquet/bloom_filter.h"
+
+namespace arrow {
+class Array;
+}
+
+namespace parquet {
+class ColumnDescriptor;
+}
+
+namespace parquet::internal {
+
+template <typename ParquetType>
+class PARQUET_EXPORT BloomFilterWriterImpl {
+ public:
+  using T = typename ParquetType::c_type;
+
+  BloomFilterWriterImpl(const ColumnDescriptor* descr, BloomFilter* 
bloom_filter);
+
+  void UpdateBloomFilter(const T* values, int64_t num_values);
+  void UpdateBloomFilterSpaced(const T* values, int64_t num_values,
+                               const uint8_t* valid_bits, int64_t 
valid_bits_offset);
+  void UpdateBloomFilterArray(const ::arrow::Array& values);
+
+  bool HasBloomFilter() const;

Review Comment:
   Can you please add a docstring? It is not obvious what the semantics are.



##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -159,6 +159,8 @@ set(PARQUET_SRCS
     arrow/variant_internal.cc
     arrow/writer.cc
     bloom_filter.cc
+    bloom_filter_builder_internal.cc
+    bloom_filter_writer_internal.cc

Review Comment:
   Could you address this @mapleFU ? It's confusing to have both "builder" and 
"writer" APIs, IMHO.



##########
cpp/src/parquet/bloom_filter_writer_internal.cc:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer_internal.h"
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+#include "arrow/visit_data_inline.h"
+
+namespace parquet::internal {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriterImpl<ParquetType>::BloomFilterWriterImpl(const 
ColumnDescriptor* descr,
+                                                          BloomFilter* 
bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriterImpl<ParquetType>::HasBloomFilter() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilter(const T* values,
+                                                           int64_t num_values) 
{
+  if (bloom_filter_ == nullptr) {
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i);
+    bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                          hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), 
static_cast<int>(current_hash_batch_size));
+  }
+}
+
+template <>
+void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_ == nullptr) {
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i);
+    bloom_filter_->Hashes(values, descr_->type_length(),
+                          static_cast<int>(current_hash_batch_size), 
hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), 
static_cast<int>(current_hash_batch_size));
+  }
+}
+
+template <>
+void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) {
+    throw ParquetException("BooleanType does not support bloom filters");
+  }
+}
+
+template <typename ParquetType>
+void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilterSpaced(
+    const T* values, int64_t num_values, const uint8_t* valid_bits,
+    int64_t valid_bits_offset) {
+  if (bloom_filter_ == nullptr) {
+    // No bloom filter to update
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto current_hash_batch_size = std::min(kHashBatchSize, length - i);
+          bloom_filter_->Hashes(values + i + position,
+                                static_cast<int>(current_hash_batch_size), 
hashes.data());
+          bloom_filter_->InsertHashes(hashes.data(),
+                                      
static_cast<int>(current_hash_batch_size));
+        }
+      });
+}
+
+template <>
+void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {}
+
+template <>
+void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilterSpaced(const FLBA* 
values,
+                                                              int64_t 
num_values,
+                                                              const uint8_t* 
valid_bits,
+                                                              int64_t 
valid_bits_offset) {
+  if (bloom_filter_ == nullptr) {
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto current_hash_batch_size = std::min(kHashBatchSize, length - i);
+          bloom_filter_->Hashes(values + i + position, descr_->type_length(),
+                                static_cast<int>(current_hash_batch_size), 
hashes.data());
+          bloom_filter_->InsertHashes(hashes.data(),
+                                      
static_cast<int>(current_hash_batch_size));
+        }
+      });
+}
+
+template <typename ArrayType>
+void UpdateBinaryBloomFilter(BloomFilter& bloom_filter, const ArrayType& 
array) {
+  // Using a smaller size because an extra `byte_arrays` is used.
+  constexpr int64_t kBinaryHashBatchSize = 64;
+  std::array<ByteArray, kBinaryHashBatchSize> byte_arrays;

Review Comment:
   The `ByteArray` indirection is horrible. Please can you find another way? 
Why not let `BloomFilter` take a Arrow array instead of implementing this in 
BloomFilterWriter?



##########
cpp/src/parquet/bloom_filter_writer_internal.cc:
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/bloom_filter_writer_internal.h"
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/unreachable.h"
+#include "arrow/visit_data_inline.h"
+
+namespace parquet::internal {
+
+constexpr int64_t kHashBatchSize = 256;
+
+template <typename ParquetType>
+BloomFilterWriterImpl<ParquetType>::BloomFilterWriterImpl(const 
ColumnDescriptor* descr,
+                                                          BloomFilter* 
bloom_filter)
+    : descr_(descr), bloom_filter_(bloom_filter) {}
+
+template <typename ParquetType>
+bool BloomFilterWriterImpl<ParquetType>::HasBloomFilter() const {
+  return bloom_filter_ != nullptr;
+}
+
+template <typename ParquetType>
+void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilter(const T* values,
+                                                           int64_t num_values) 
{
+  if (bloom_filter_ == nullptr) {
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i);
+    bloom_filter_->Hashes(values, static_cast<int>(current_hash_batch_size),
+                          hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), 
static_cast<int>(current_hash_batch_size));
+  }
+}
+
+template <>
+void BloomFilterWriterImpl<FLBAType>::UpdateBloomFilter(const FLBA* values,
+                                                        int64_t num_values) {
+  if (bloom_filter_ == nullptr) {
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  for (int64_t i = 0; i < num_values; i += kHashBatchSize) {
+    int64_t current_hash_batch_size = std::min(kHashBatchSize, num_values - i);
+    bloom_filter_->Hashes(values, descr_->type_length(),
+                          static_cast<int>(current_hash_batch_size), 
hashes.data());
+    bloom_filter_->InsertHashes(hashes.data(), 
static_cast<int>(current_hash_batch_size));
+  }
+}
+
+template <>
+void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilter(const bool*, 
int64_t) {
+  if (ARROW_PREDICT_FALSE(bloom_filter_ != nullptr)) {
+    throw ParquetException("BooleanType does not support bloom filters");
+  }
+}
+
+template <typename ParquetType>
+void BloomFilterWriterImpl<ParquetType>::UpdateBloomFilterSpaced(
+    const T* values, int64_t num_values, const uint8_t* valid_bits,
+    int64_t valid_bits_offset) {
+  if (bloom_filter_ == nullptr) {
+    // No bloom filter to update
+    return;
+  }
+  std::array<uint64_t, kHashBatchSize> hashes;
+  ::arrow::internal::VisitSetBitRunsVoid(
+      valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t 
length) {
+        for (int64_t i = 0; i < length; i += kHashBatchSize) {
+          auto current_hash_batch_size = std::min(kHashBatchSize, length - i);
+          bloom_filter_->Hashes(values + i + position,
+                                static_cast<int>(current_hash_batch_size), 
hashes.data());
+          bloom_filter_->InsertHashes(hashes.data(),
+                                      
static_cast<int>(current_hash_batch_size));
+        }
+      });
+}
+
+template <>
+void BloomFilterWriterImpl<BooleanType>::UpdateBloomFilterSpaced(const bool*, 
int64_t,
+                                                                 const 
uint8_t*,
+                                                                 int64_t) {}

Review Comment:
   Really?



##########
cpp/src/parquet/properties.h:
##########
@@ -909,6 +993,19 @@ class PARQUET_EXPORT WriterProperties {
     return false;
   }
 
+  bool bloom_filter_enabled() const {

Review Comment:
   Why is this useful?



##########
cpp/src/parquet/bloom_filter_builder_internal.h:
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// @brief Interface for collecting bloom filter of a parquet file.
+class PARQUET_EXPORT BloomFilterBuilder {
+ public:
+  /// @brief API to create a BloomFilterBuilder.
+  ///
+  /// @param schema The schema of the file and it must outlive the created
+  /// BloomFilterBuilder.
+  /// @param properties The properties of the file with a set of 
`BloomFilterOption`s
+  /// for columns enabling bloom filters. It must outlive the created 
BloomFilterBuilder.
+  static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* 
schema,
+                                                  const WriterProperties* 
properties);
+
+  /// @brief Start a new row group to host all bloom filters belong to it.
+  ///
+  /// This method must be called before `GetOrCreateBloomFilter` for columns 
of a new row
+  /// group.
+  ///
+  /// @throws ParquetException if WriteTo() has been called to flush bloom 
filters.
+  virtual void AppendRowGroup() = 0;

Review Comment:
   Is it useful to have such a stateful API? Why not create an independent 
builder for each row group?



##########
cpp/src/parquet/bloom_filter_builder_internal.h:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/io/type_fwd.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// @brief Interface for collecting bloom filter of a parquet file.

Review Comment:
   @mapleFU You haven't addressed this comment, have you?



##########
cpp/src/parquet/properties.h:
##########
@@ -214,6 +240,17 @@ class PARQUET_EXPORT ColumnProperties {
     page_index_enabled_ = page_index_enabled;
   }
 
+  void set_bloom_filter_options(std::optional<BloomFilterOptions> 
bloom_filter_options) {
+    if (bloom_filter_options) {
+      if (bloom_filter_options->fpp > 1.0 || bloom_filter_options->fpp < 0.0) {

Review Comment:
   Are 0.0 and 1.0 valid values? It seems a bit surprising.



##########
cpp/src/parquet/metadata.h:
##########
@@ -505,21 +504,40 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// Alias type of page index location of a row group. The index location
+/// is located by column ordinal. If a column does not have a page index,
+/// its value is set to std::nullopt.
+using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+
+/// Alias type of bloom filter location of a row group. The filter location
+/// is located by column ordinal.
+///
+/// Number of columns with a bloom filter to be relatively small compared to
+/// the number of overall columns, so map is used.
+using RowGroupBloomFilterLocation = std::map<int32_t, IndexLocation>;
+
+/// Alias type of page index and location of a parquet file. The
+/// index location is located by the row group ordinal.
+using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
+
+/// Alias type of bloom filter and location of a parquet file. The
+/// index location is located by the row group ordinal.
+using FileBloomFilterLocation = std::map<size_t, RowGroupBloomFilterLocation>;

Review Comment:
   Why `std::map` instead of `std::unordered_map`? Also, does the two-level map 
really make sense? We should use the least wasteful option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to