michalursa commented on a change in pull request #12067:
URL: https://github.com/apache/arrow/pull/12067#discussion_r821419293



##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+TEST(BloomFilter, Masks) {
+  BloomFilterMasks masks;
+  std::vector<int> 
sum_num_masks_with_same_n_bits(BloomFilterMasks::kMaxBitsSet + 1);
+  std::vector<int> num_masks_with_same_n_bits(BloomFilterMasks::kMaxBitsSet + 
1);
+
+  for (bool with_rotation : {false, true}) {
+    printf("With bit rotation: %s\n", with_rotation ? "ON" : "OFF");
+
+    for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+      sum_num_masks_with_same_n_bits[i] = 0;
+    }
+
+    for (int imask = 0; imask < BloomFilterMasks::kNumMasks; ++imask) {
+      uint64_t mask = masks.mask(imask);
+      // Verify that the number of bits set is in the required range
+      //
+      ARROW_DCHECK(ARROW_POPCOUNT64(mask) >= BloomFilterMasks::kMinBitsSet &&
+                   ARROW_POPCOUNT64(mask) <= BloomFilterMasks::kMaxBitsSet);
+
+      for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+        num_masks_with_same_n_bits[i] = 0;
+      }
+      for (int imask2nd = 0; imask2nd < BloomFilterMasks::kNumMasks; 
++imask2nd) {
+        if (imask == imask2nd) {
+          continue;
+        }
+        uint64_t mask_to_compare_to = masks.mask(imask2nd);
+        for (int bits_to_rotate = 0; bits_to_rotate < (with_rotation ? 64 : 1);
+             ++bits_to_rotate) {
+          uint64_t mask_rotated = bits_to_rotate == 0
+                                      ? mask_to_compare_to
+                                      : ROTL64(mask_to_compare_to, 
bits_to_rotate);
+          ++num_masks_with_same_n_bits[ARROW_POPCOUNT64(mask & mask_rotated)];
+        }
+      }
+      for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+        sum_num_masks_with_same_n_bits[i] += num_masks_with_same_n_bits[i];
+      }
+    }
+
+    printf(
+        "Expected fraction of masks with the same N bits as any random "
+        "mask:\n");
+    for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+      printf("%d. %.2f \n", i,
+             static_cast<float>(sum_num_masks_with_same_n_bits[i]) /
+                 (static_cast<float>(BloomFilterMasks::kNumMasks *
+                                     BloomFilterMasks::kNumMasks) *
+                  (with_rotation ? 64 : 1)));
+    }
+    printf("\n");
+  }
+}
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  // omp_set_num_threads(static_cast<int>(num_threads));
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    // #pragma omp parallel for
+    for (int64_t i = 0; i < builder->num_tasks(); ++i) {
+      builder->RunInitTask(i);
+    }
+
+    // #pragma omp parallel for
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;  // omp_get_thread_num();
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    // #pragma omp parallel for
+    for (int64_t i = 0; i < builder->num_tasks(); ++i) {
+      builder->RunFinishTask(i);
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd, bool 
enable_prefetch,
+                      float* fpr, float* build_cost, float* probe_cost) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;
+  Random64Bit rnd(/*seed=*/0);
+  std::vector<uint64_t> unique_keys;
+  {
+    std::set<uint64_t> unique_keys_set;
+    for (int64_t i = 0; i < num_build + num_probe; ++i) {
+      uint64_t value;
+      for (;;) {
+        value = rnd.next();
+        if (unique_keys_set.find(value) == unique_keys_set.end()) {
+          break;
+        }
+      }
+      unique_keys.push_back(value);
+      unique_keys_set.insert(value);
+    }
+  }
+
+  // Generate input hashes
+  //
+  std::vector<uint32_t> hashes32;
+  std::vector<uint64_t> hashes64;
+  hashes32.resize(unique_keys.size());
+  hashes64.resize(unique_keys.size());
+  int batch_size_max = 1024;
+  for (size_t i = 0; i < unique_keys.size(); i += batch_size_max) {
+    int batch_size = static_cast<int>(
+        std::min(unique_keys.size() - i, static_cast<size_t>(batch_size_max)));
+    int key_length = sizeof(uint64_t);
+    Hashing32::hash_fixed(hardware_flags, /*combine_hashes=*/false, batch_size,
+                          key_length,
+                          reinterpret_cast<const uint8_t*>(unique_keys.data() 
+ i),
+                          hashes32.data() + i, nullptr);
+    Hashing64::hash_fixed(
+        /*combine_hashes=*/false, batch_size, key_length,
+        reinterpret_cast<const uint8_t*>(unique_keys.data() + i), 
hashes64.data() + i);
+  }
+
+  MemoryPool* pool = default_memory_pool();
+
+  // Build the filter
+  //
+  BlockedBloomFilter reference;
+  BlockedBloomFilter bloom;
+  float build_cost_single_threaded;
+
+  RETURN_NOT_OK(BuildBloomFilter(
+      BloomFilterBuildStrategy::SINGLE_THREADED, dop, hardware_flags, pool, 
num_build,
+      [hashes32](int64_t first_row, int num_rows, uint32_t* output_hashes) {
+        memcpy(output_hashes, hashes32.data() + first_row, num_rows * 
sizeof(uint32_t));
+      },
+      [hashes64](int64_t first_row, int num_rows, uint64_t* output_hashes) {
+        memcpy(output_hashes, hashes64.data() + first_row, num_rows * 
sizeof(uint64_t));
+      },
+      &reference, &build_cost_single_threaded));
+
+  RETURN_NOT_OK(BuildBloomFilter(
+      strategy, dop, hardware_flags, pool, num_build * num_build_copies,
+      [hashes32, num_build](int64_t first_row, int num_rows, uint32_t* 
output_hashes) {
+        int64_t first_row_clamped = first_row % num_build;
+        int64_t num_rows_processed = 0;
+        while (num_rows_processed < num_rows) {
+          int64_t num_rows_next =
+              std::min(static_cast<int64_t>(num_rows) - num_rows_processed,
+                       num_build - first_row_clamped);
+          memcpy(output_hashes + num_rows_processed, hashes32.data() + 
first_row_clamped,
+                 num_rows_next * sizeof(uint32_t));
+          first_row_clamped = 0;
+          num_rows_processed += num_rows_next;
+        }
+      },
+      [hashes64, num_build](int64_t first_row, int num_rows, uint64_t* 
output_hashes) {
+        int64_t first_row_clamped = first_row % num_build;
+        int64_t num_rows_processed = 0;
+        while (num_rows_processed < num_rows) {
+          int64_t num_rows_next =
+              std::min(static_cast<int64_t>(num_rows) - num_rows_processed,
+                       num_build - first_row_clamped);
+          memcpy(output_hashes + num_rows_processed, hashes64.data() + 
first_row_clamped,
+                 num_rows_next * sizeof(uint64_t));
+          first_row_clamped = 0;
+          num_rows_processed += num_rows_next;
+        }
+      },
+      &bloom, build_cost));
+
+  int log_before = bloom.log_num_blocks();
+
+  if (num_build_copies > 1) {
+    reference.Fold();
+    bloom.Fold();
+  } else {
+    if (strategy != BloomFilterBuildStrategy::SINGLE_THREADED) {
+      bool is_same = reference.IsSameAs(&bloom);
+      printf("%s ", is_same ? "BUILD_CORRECT" : "BUILD_WRONG");
+      ARROW_DCHECK(is_same);
+    }
+  }
+
+  int log_after = bloom.log_num_blocks();
+
+  float fraction_of_bits_set = static_cast<float>(bloom.NumBitsSet()) /
+                               static_cast<float>(64LL << 
bloom.log_num_blocks());
+
+  printf("log_before = %d log_after = %d percent_bits_set = %.1f ", 
log_before, log_after,
+         100.0f * fraction_of_bits_set);
+
+  // Verify no false negatives
+  //
+  bool ok = true;
+  for (int64_t i = 0; i < num_build; ++i) {
+    bool found;
+    if (bloom.NumHashBitsUsed() > 32) {
+      found = bloom.Find(hashes64[i]);
+    } else {
+      found = bloom.Find(hashes32[i]);
+    }
+    if (!found) {
+      ok = false;
+      ARROW_DCHECK(false);
+      break;
+    }
+  }
+  printf("%s\n", ok ? "success" : "failure");
+
+  // Measure FPR and performance
+  //
+  std::vector<uint8_t> result_bit_vector;
+  result_bit_vector.resize(bit_util::BytesForBits(batch_size_max));
+  std::atomic<int64_t> num_positives;
+  num_positives.store(0);
+
+  int64_t num_repeats = 1LL;
+#ifdef NDEBUG
+  num_repeats = std::max(1LL, bit_util::CeilDiv(1000000ULL, num_probe));
+#endif
+
+  auto time0 = std::chrono::high_resolution_clock::now();
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    for (int64_t i = num_build; i < num_build + num_probe;) {
+      int batch_size =
+          static_cast<int>(std::min(static_cast<size_t>(unique_keys.size() - 
i),
+                                    static_cast<size_t>(batch_size_max)));
+      if (bloom.NumHashBitsUsed() > 32) {
+        bloom.Find(hardware_flags, batch_size, hashes64.data() + i,
+                   result_bit_vector.data(), enable_prefetch);
+      } else {
+        bloom.Find(hardware_flags, batch_size, hashes32.data() + i,
+                   result_bit_vector.data(), enable_prefetch);
+      }
+      num_positives += arrow::internal::CountSetBits(result_bit_vector.data(),
+                                                     /*offset=*/0, batch_size);
+      i += batch_size;
+    }
+  }
+  auto time1 = std::chrono::high_resolution_clock::now();
+  auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+  *probe_cost = static_cast<float>(ns) / static_cast<float>(num_probe * 
num_repeats);
+
+  *fpr = 100.0f * static_cast<float>(num_positives.load()) /
+         static_cast<float>(num_probe * num_repeats);
+
+  return Status::OK();

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to