wgtmac commented on code in PR #50008:
URL: https://github.com/apache/arrow/pull/50008#discussion_r3321757827


##########
cpp/src/parquet/properties.h:
##########
@@ -174,11 +175,14 @@ struct PARQUET_EXPORT BloomFilterOptions {
   /// Expected number of distinct values (NDV) in the bloom filter.
   ///
   /// Bloom filters are most effective for high-cardinality columns. A good 
default
-  /// is to set ndv equal to the number of rows. Lower values reduce disk 
usage but
-  /// may not be worthwhile for very small NDVs.
+  /// is to set ndv equal to the number of rows. If unset, the writer resolves 
ndv
+  /// to the max row group row count. Lower values reduce disk usage but may 
not
+  /// be worthwhile for very small NDVs.
   ///
-  /// Increasing ndv (without increasing fpp) increases disk and memory usage.
-  int32_t ndv = 1 << 20;
+  /// Increasing ndv (without increasing fpp) increases memory usage. The 
writer

Review Comment:
   > The writer may fold the filter before serialization
   
   It is better to make it clear that when the writer will fold the filter so 
users do not need to guess or read the code.



##########
cpp/src/parquet/properties.h:
##########
@@ -174,11 +175,14 @@ struct PARQUET_EXPORT BloomFilterOptions {
   /// Expected number of distinct values (NDV) in the bloom filter.
   ///
   /// Bloom filters are most effective for high-cardinality columns. A good 
default
-  /// is to set ndv equal to the number of rows. Lower values reduce disk 
usage but
-  /// may not be worthwhile for very small NDVs.
+  /// is to set ndv equal to the number of rows. If unset, the writer resolves 
ndv
+  /// to the max row group row count. Lower values reduce disk usage but may 
not
+  /// be worthwhile for very small NDVs.
   ///
-  /// Increasing ndv (without increasing fpp) increases disk and memory usage.
-  int32_t ndv = 1 << 20;
+  /// Increasing ndv (without increasing fpp) increases memory usage. The 
writer
+  /// may fold the filter before serialization, but will not grow an undersized
+  /// filter.
+  std::optional<int64_t> ndv = std::nullopt;

Review Comment:
   We need to raise this in the PR description (if not done) that this is a 
minor breaking change.



##########
cpp/src/parquet/bloom_filter_reader_writer_test.cc:
##########
@@ -150,6 +150,53 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
   }
 }
 
+TEST(BloomFilterBuilder, FoldsOverestimatedNdvBeforeWriting) {

Review Comment:
   Can we add more tests and monitor whether expected filter is folded and 
non-folded? Perhaps we can reuse a single test but with different volumes of 
inserted hashes.



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// @param fpp The false positive probability.
   /// @return it always return a value between kMinimumBloomFilterBytes * 8 and
   /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 
16
-  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+  static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) {

Review Comment:
   Do we want to expand the return type to `uint64_t`? We've encountered bloom 
filters that greater than 512MB so this type really limits its usage.



##########
cpp/src/parquet/properties.h:
##########
@@ -174,11 +175,14 @@ struct PARQUET_EXPORT BloomFilterOptions {
   /// Expected number of distinct values (NDV) in the bloom filter.
   ///
   /// Bloom filters are most effective for high-cardinality columns. A good 
default
-  /// is to set ndv equal to the number of rows. Lower values reduce disk 
usage but
-  /// may not be worthwhile for very small NDVs.
+  /// is to set ndv equal to the number of rows. If unset, the writer resolves 
ndv
+  /// to the max row group row count. Lower values reduce disk usage but may 
not
+  /// be worthwhile for very small NDVs.
   ///
-  /// Increasing ndv (without increasing fpp) increases disk and memory usage.
-  int32_t ndv = 1 << 20;
+  /// Increasing ndv (without increasing fpp) increases memory usage. The 
writer

Review Comment:
   Is it worth adding a bool flag to control whether to apply this algorithm? 
The main concern is that it applies average fill rate to estimate target fpp 
which may still has worst cases, e.g. when keys are highly skewed in only a few 
blocks.



##########
cpp/src/parquet/properties.h:
##########
@@ -256,6 +260,10 @@ class PARQUET_EXPORT ColumnProperties {
           "Bloom filter false positive probability must be in (0.0, 1.0), got 
" +
           std::to_string(bloom_filter_options.fpp));
     }
+    if (bloom_filter_options.ndv.has_value() && 
bloom_filter_options.ndv.value() < 0) {
+      throw ParquetException("Bloom filter number of distinct values must be 
>= 0, got " +

Review Comment:
   What is the expected behavior of 0?



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +347,75 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* 
sink) const {
   PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+  const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+  if (num_folds > 0) {
+    Fold(num_folds);
+  }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+  const uint32_t num_blocks = NumBlocks();
+  if (num_blocks < 2) {
+    return 0;
+  }
+  DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+  uint64_t total_set_bits = 0;
+  const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+  const uint32_t num_words = num_bytes_ / 
static_cast<uint32_t>(sizeof(uint32_t));
+  for (uint32_t i = 0; i < num_words; ++i) {
+    total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+  }
+
+  const double avg_fill =
+      static_cast<double>(total_set_bits) / (static_cast<double>(num_blocks) * 
256.0);
+  const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+  if (avg_fill == 0.0) {
+    return max_folds;
+  }
+
+  uint32_t num_folds = 0;
+  double one_minus_fk = 1.0 - avg_fill;

Review Comment:
   BTW, can we add a brief introduction of this algorithm as a comment?



##########
cpp/src/parquet/bloom_filter.h:
##########
@@ -350,6 +353,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
 
  private:
   inline void InsertHashImpl(uint64_t hash);
+  uint32_t NumBlocks() const { return num_bytes_ / kBytesPerFilterBlock; }

Review Comment:
   Should we add a DCHECK to verify that num_bytes_ is a multiply of 
kBytesPerFilterBlock?



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +347,75 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* 
sink) const {
   PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+  const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+  if (num_folds > 0) {
+    Fold(num_folds);
+  }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+  const uint32_t num_blocks = NumBlocks();
+  if (num_blocks < 2) {
+    return 0;
+  }
+  DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+  uint64_t total_set_bits = 0;
+  const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+  const uint32_t num_words = num_bytes_ / 
static_cast<uint32_t>(sizeof(uint32_t));
+  for (uint32_t i = 0; i < num_words; ++i) {
+    total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+  }
+
+  const double avg_fill =
+      static_cast<double>(total_set_bits) / (static_cast<double>(num_blocks) * 
256.0);
+  const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+  if (avg_fill == 0.0) {
+    return max_folds;
+  }
+
+  uint32_t num_folds = 0;
+  double one_minus_fk = 1.0 - avg_fill;

Review Comment:
   Can we use a better name for `fk`? I don't really know what it stands for.



##########
cpp/src/parquet/bloom_filter.cc:
##########
@@ -345,9 +347,75 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* 
sink) const {
   PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+  const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp);
+  if (num_folds > 0) {
+    Fold(num_folds);
+  }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp) const {
+  const uint32_t num_blocks = NumBlocks();
+  if (num_blocks < 2) {
+    return 0;
+  }
+  DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+  uint64_t total_set_bits = 0;
+  const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
+  const uint32_t num_words = num_bytes_ / 
static_cast<uint32_t>(sizeof(uint32_t));
+  for (uint32_t i = 0; i < num_words; ++i) {
+    total_set_bits += static_cast<uint64_t>(std::popcount(bitset32[i]));
+  }
+
+  const double avg_fill =
+      static_cast<double>(total_set_bits) / (static_cast<double>(num_blocks) * 
256.0);
+  const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+  if (avg_fill == 0.0) {
+    return max_folds;
+  }
+
+  uint32_t num_folds = 0;
+  double one_minus_fk = 1.0 - avg_fill;
+  for (uint32_t i = 0; i < max_folds; ++i) {
+    one_minus_fk *= one_minus_fk;
+    const double fk = 1.0 - one_minus_fk;
+    const double estimated_fpp = std::pow(fk, kBitsSetPerBlock);
+    if (estimated_fpp > target_fpp) {
+      break;
+    }
+    ++num_folds;
+  }
+  return num_folds;
+}
+
+void BlockSplitBloomFilter::Fold(uint32_t num_folds) {

Review Comment:
   ```cpp
   void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
     DCHECK_GT(num_folds, 0);
   
     const uint32_t num_blocks = NumBlocks();
     const uint32_t group_size = UINT32_C(1) << num_folds;
     DCHECK_LE(group_size, num_blocks);
   
     const uint32_t new_num_blocks = num_blocks / group_size;
     auto* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
   
     for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) {
       const uint32_t src_block = dst_block * group_size;
       uint32_t merged[kBitsSetPerBlock];
   
       std::copy_n(bitset32 + src_block * kBitsSetPerBlock, kBitsSetPerBlock, 
merged);
   
       for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) {
         const uint32_t* src =
             bitset32 + (src_block + fold_block) * kBitsSetPerBlock;
         for (int word = 0; word < kBitsSetPerBlock; ++word) {
           merged[word] |= src[word];
         }
       }
   
       std::copy_n(merged, kBitsSetPerBlock,
                   bitset32 + dst_block * kBitsSetPerBlock);
     }
   
     num_bytes_ = new_num_blocks * kBytesPerFilterBlock;
   }
   ```
   
   The current loop reduces one word position at a time across all blocks in 
the fold group, so it reads the group with a 32-byte stride and revisits the 
same cache lines for each of the 8 words. A block-wise accumulation reads each 
32-byte block contiguously and consumes all 8 words while the cache line is 
hot. It also gives the compiler a much more natural contiguous 8-word OR 
pattern, which is more likely to be SLP/vectorized into 128/256-bit loads, ORs, 
and stores.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to