westonpace commented on a change in pull request #12067:
URL: https://github.com/apache/arrow/pull/12067#discussion_r826402410



##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd,
+                      bool enable_prefetch) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;
+  Random64Bit rnd(/*seed=*/0);
+  std::vector<uint64_t> unique_keys;
+  {
+    std::set<uint64_t> unique_keys_set;
+    for (int64_t i = 0; i < num_build + num_probe; ++i) {
+      uint64_t value;
+      for (;;) {
+        value = rnd.next();
+        if (unique_keys_set.find(value) == unique_keys_set.end()) {
+          break;
+        }
+      }
+      unique_keys.push_back(value);
+      unique_keys_set.insert(value);
+    }

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd,
+                      bool enable_prefetch) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;
+  Random64Bit rnd(/*seed=*/0);
+  std::vector<uint64_t> unique_keys;
+  {

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+TEST(BloomFilter, Masks) {
+  BloomFilterMasks masks;
+  std::vector<int> 
sum_num_masks_with_same_n_bits(BloomFilterMasks::kMaxBitsSet + 1);
+  std::vector<int> num_masks_with_same_n_bits(BloomFilterMasks::kMaxBitsSet + 
1);
+
+  for (bool with_rotation : {false, true}) {
+    printf("With bit rotation: %s\n", with_rotation ? "ON" : "OFF");
+
+    for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+      sum_num_masks_with_same_n_bits[i] = 0;
+    }
+
+    for (int imask = 0; imask < BloomFilterMasks::kNumMasks; ++imask) {
+      uint64_t mask = masks.mask(imask);
+      // Verify that the number of bits set is in the required range
+      //
+      ARROW_DCHECK(ARROW_POPCOUNT64(mask) >= BloomFilterMasks::kMinBitsSet &&
+                   ARROW_POPCOUNT64(mask) <= BloomFilterMasks::kMaxBitsSet);
+
+      for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+        num_masks_with_same_n_bits[i] = 0;
+      }
+      for (int imask2nd = 0; imask2nd < BloomFilterMasks::kNumMasks; 
++imask2nd) {
+        if (imask == imask2nd) {
+          continue;
+        }
+        uint64_t mask_to_compare_to = masks.mask(imask2nd);
+        for (int bits_to_rotate = 0; bits_to_rotate < (with_rotation ? 64 : 1);
+             ++bits_to_rotate) {
+          uint64_t mask_rotated = bits_to_rotate == 0
+                                      ? mask_to_compare_to
+                                      : ROTL64(mask_to_compare_to, 
bits_to_rotate);
+          ++num_masks_with_same_n_bits[ARROW_POPCOUNT64(mask & mask_rotated)];
+        }
+      }
+      for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+        sum_num_masks_with_same_n_bits[i] += num_masks_with_same_n_bits[i];
+      }
+    }
+
+    printf(
+        "Expected fraction of masks with the same N bits as any random "
+        "mask:\n");
+    for (int i = 0; i <= BloomFilterMasks::kMaxBitsSet; ++i) {
+      printf("%d. %.2f \n", i,
+             static_cast<float>(sum_num_masks_with_same_n_bits[i]) /
+                 (static_cast<float>(BloomFilterMasks::kNumMasks *
+                                     BloomFilterMasks::kNumMasks) *
+                  (with_rotation ? 64 : 1)));
+    }
+    printf("\n");
+  }
+}
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,

Review comment:
       Thank you for removing the benchmarking parts.  However, there are still 
a lot of vestiges left over (e.g. `build_cost`, `num_threads`, etc.) that 
should probably be removed.
   
   In addition, it seems there is no longer any testing of probing.  So either 
we can add in assertion based testing (e.g. false positive rate should be less 
than 5%) or get rid of all the logic that creates keys for probing.
   
   As with the hash testing I took a stab at simplifying things as I was going 
through the review.  Feel free to use any or none of my attempt here: 
https://github.com/westonpace/arrow/commit/2e47f9c46688f8a43cffd40ed2182e3c229bfbbb

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;

Review comment:
       Since we aren't running this in parallel we can change these to a 1d 
vector.

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd,
+                      bool enable_prefetch) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;
+  Random64Bit rnd(/*seed=*/0);
+  std::vector<uint64_t> unique_keys;
+  {
+    std::set<uint64_t> unique_keys_set;
+    for (int64_t i = 0; i < num_build + num_probe; ++i) {
+      uint64_t value;
+      for (;;) {
+        value = rnd.next();
+        if (unique_keys_set.find(value) == unique_keys_set.end()) {
+          break;
+        }
+      }
+      unique_keys.push_back(value);
+      unique_keys_set.insert(value);
+    }
+  }
+
+  // Generate input hashes
+  //
+  std::vector<uint32_t> hashes32;
+  std::vector<uint64_t> hashes64;
+  hashes32.resize(unique_keys.size());
+  hashes64.resize(unique_keys.size());
+  int batch_size_max = 1024;
+  for (size_t i = 0; i < unique_keys.size(); i += batch_size_max) {
+    int batch_size = static_cast<int>(
+        std::min(unique_keys.size() - i, static_cast<size_t>(batch_size_max)));
+    constexpr int key_length = sizeof(uint64_t);
+    Hashing32::hash_fixed(hardware_flags, /*combine_hashes=*/false, batch_size,
+                          key_length,
+                          reinterpret_cast<const uint8_t*>(unique_keys.data() 
+ i),
+                          hashes32.data() + i, nullptr);
+    Hashing64::hash_fixed(
+        /*combine_hashes=*/false, batch_size, key_length,
+        reinterpret_cast<const uint8_t*>(unique_keys.data() + i), 
hashes64.data() + i);
+  }
+
+  MemoryPool* pool = default_memory_pool();
+
+  // Build the filter
+  //
+  BlockedBloomFilter reference;
+  BlockedBloomFilter bloom;
+  float build_cost_single_threaded;
+  float build_cost;
+
+  RETURN_NOT_OK(BuildBloomFilter(
+      BloomFilterBuildStrategy::SINGLE_THREADED, dop, hardware_flags, pool, 
num_build,
+      [hashes32](int64_t first_row, int num_rows, uint32_t* output_hashes) {
+        memcpy(output_hashes, hashes32.data() + first_row, num_rows * 
sizeof(uint32_t));
+      },
+      [hashes64](int64_t first_row, int num_rows, uint64_t* output_hashes) {
+        memcpy(output_hashes, hashes64.data() + first_row, num_rows * 
sizeof(uint64_t));
+      },
+      &reference, &build_cost_single_threaded));
+
+  RETURN_NOT_OK(BuildBloomFilter(
+      strategy, dop, hardware_flags, pool, num_build * num_build_copies,
+      [hashes32, num_build](int64_t first_row, int num_rows, uint32_t* 
output_hashes) {

Review comment:
       These are pretty large lambdas to include inline and there is a lot in 
common between the two.  It might be simpler to make a `test_Bloom_small_hash` 
similar to `test_Bloom_large_hash` below.

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //

Review comment:
       We can remove this comment (and the looping) now I think.

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {

Review comment:
       `build_cost` is benchmarking only and can probably be removed.

##########
File path: cpp/src/arrow/compute/exec/bloom_filter.h
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#if defined(ARROW_HAVE_AVX2)
+#include <immintrin.h>
+#endif
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+// A set of pre-generated bit masks from a 64-bit word.
+//
+// It is used to map selected bits of hash to a bit mask that will be used in
+// a Bloom filter.
+//
+// These bit masks need to look random and need to have a similar fractions of
+// bits set in order for a Bloom filter to have a low false positives rate.
+//
+struct ARROW_EXPORT BloomFilterMasks {
+  // Generate all masks as a single bit vector. Each bit offset in this bit
+  // vector corresponds to a single mask.
+  // In each consecutive kBitsPerMask bits, there must be between
+  // kMinBitsSet and kMaxBitsSet bits set.
+  //
+  BloomFilterMasks();
+
+  inline uint64_t mask(int bit_offset) {
+#if ARROW_LITTLE_ENDIAN
+    return (util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >> (bit_offset 
% 8)) &
+           kFullMask;
+#else
+    return (BYTESWAP(util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >>
+            (bit_offset % 8)) &
+           kFullMask;
+#endif
+  }
+
+  // Masks are 57 bits long because then they can be accessed at an
+  // arbitrary bit offset using a single unaligned 64-bit load instruction.
+  //
+  static constexpr int kBitsPerMask = 57;
+  static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1;
+
+  // Minimum and maximum number of bits set in each mask.
+  // This constraint is enforced when generating the bit masks.
+  // Values should be close to each other and chosen as to minimize a Bloom
+  // filter false positives rate.
+  //
+  static constexpr int kMinBitsSet = 4;
+  static constexpr int kMaxBitsSet = 5;
+
+  // Number of generated masks.
+  // Having more masks to choose will improve false positives rate of Bloom
+  // filter but will also use more memory, which may lead to more CPU cache
+  // misses.
+  // The chosen value results in using only a few cache-lines for mask lookups,
+  // while providing a good variety of available bit masks.
+  //
+  static constexpr int kLogNumMasks = 10;
+  static constexpr int kNumMasks = 1 << kLogNumMasks;
+
+  // Data of masks. Masks are stored in a single bit vector. Nth mask is
+  // kBitsPerMask bits starting at bit offset N.
+  //
+  static constexpr int kTotalBytes = (kNumMasks + 64) / 8;
+  uint8_t masks_[kTotalBytes];
+};
+
+// A variant of a blocked Bloom filter implementation.
+// A Bloom filter is a data structure that provides approximate membership test
+// functionality based only on the hash of the key. Membership test may return
+// false positives but not false negatives. Approximation of the result allows
+// in general case (for arbitrary data types of keys) to save on both memory 
and
+// lookup cost compared to the accurate membership test.
+// The accurate test may sometimes still be cheaper for a specific data types
+// and inputs, e.g. integers from a small range.
+//
+// This blocked Bloom filter is optimized for use in hash joins, to achieve a
+// good balance between the size of the filter, the cost of its building and
+// querying and the rate of false positives.
+//
+class ARROW_EXPORT BlockedBloomFilter {
+ public:
+  Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool);
+
+  inline void Insert(uint64_t hash) {
+    uint64_t m = mask(hash);
+    uint64_t& b = blocks_[block_id(hash)];
+    b |= m;
+  }
+
+  inline bool Find(uint64_t hash) const {
+    uint64_t m = mask(hash);
+    uint64_t b = blocks_[block_id(hash)];
+    return (b & m) == m;
+  }
+
+  void Insert(int64_t hardware_flags, int64_t num_rows, const uint32_t* 
hashes);
+  void Insert(int64_t hardware_flags, int64_t num_rows, const uint64_t* 
hashes);
+
+  // Uses SIMD if available for smaller Bloom filters.
+  // Uses memory prefetching for larger Bloom filters.
+  //
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes,
+            uint8_t* result_bit_vector, bool enable_prefetch = true) const;
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* result_bit_vector, bool enable_prefetch = true) const;
+
+  // Folding of a block Bloom filter after the initial version
+  // has been built.
+  //
+  // One of the parameters for creation of Bloom filter is the number
+  // of bits allocated for it. The more bits allocated, the lower the
+  // probability of false positives. A good heuristic is to aim for
+  // half of the bits set in the constructed Bloom filter. This should
+  // result in a good trade off between size (and following cost of
+  // memory accesses) and false positives rate.
+  //
+  // There might have been many duplicate keys in the input provided
+  // to Bloom filter builder. In that case the resulting bit vector
+  // would be more sparse then originally intended. It is possible to
+  // easily correct that and cut in half the size of Bloom filter
+  // after it has already been constructed. The process to do that is
+  // approximately equal to OR-ing bits from upper and lower half (the
+  // way we address these bits when inserting or querying a hash makes
+  // such folding in half possible).
+  //
+  // We will keep folding as long as the fraction of bits set is less
+  // than 1/4. The resulting bit vector density should be in the [1/4,
+  // 1/2) range.
+  //
+  void Fold();
+
+  int log_num_blocks() const { return log_num_blocks_; }
+
+  int NumHashBitsUsed() const;
+
+  bool IsSameAs(const BlockedBloomFilter* other) const;
+
+  int64_t NumBitsSet() const;
+
+ private:
+  inline uint64_t mask(uint64_t hash) const {
+    // The lowest bits of hash are used to pick mask index.
+    //
+    int mask_id = static_cast<int>(hash & (BloomFilterMasks::kNumMasks - 1));
+    uint64_t result = masks_.mask(mask_id);
+
+    // The next set of hash bits is used to pick the amount of bit
+    // rotation of the mask.
+    //
+    int rotation = (hash >> BloomFilterMasks::kLogNumMasks) & 63;
+    result = ROTL64(result, rotation);
+
+    return result;
+  }
+
+  inline int64_t block_id(uint64_t hash) const {
+    // The next set of hash bits following the bits used to select a
+    // mask is used to pick block id (index of 64-bit word in a bit
+    // vector).
+    //
+    return (hash >> (BloomFilterMasks::kLogNumMasks + 6)) & (num_blocks_ - 1);
+  }
+
+  template <typename T>
+  inline void InsertImp(int64_t num_rows, const T* hashes);
+
+  template <typename T>
+  inline void FindImp(int64_t num_rows, const T* hashes, uint8_t* 
result_bit_vector,
+                      bool enable_prefetch) const;
+
+  void SingleFold(int num_folds);
+
+#if defined(ARROW_HAVE_AVX2)
+  inline __m256i mask_avx2(__m256i hash) const;
+  inline __m256i block_id_avx2(__m256i hash) const;
+  int64_t Insert_avx2(int64_t num_rows, const uint32_t* hashes);
+  int64_t Insert_avx2(int64_t num_rows, const uint64_t* hashes);
+  template <typename T>
+  int64_t InsertImp_avx2(int64_t num_rows, const T* hashes);
+  int64_t Find_avx2(int64_t num_rows, const uint32_t* hashes,
+                    uint8_t* result_bit_vector) const;
+  int64_t Find_avx2(int64_t num_rows, const uint64_t* hashes,
+                    uint8_t* result_bit_vector) const;
+  template <typename T>
+  int64_t FindImp_avx2(int64_t num_rows, const T* hashes,
+                       uint8_t* result_bit_vector) const;
+#endif
+
+  bool UsePrefetch() const {
+    return num_blocks_ * sizeof(uint64_t) > kPrefetchLimitBytes;
+  }
+
+  static constexpr int64_t kPrefetchLimitBytes = 256 * 1024;
+
+  static BloomFilterMasks masks_;
+
+  // Total number of bits used by block Bloom filter must be a power
+  // of 2.
+  //
+  int log_num_blocks_;
+  int64_t num_blocks_;
+
+  // Buffer allocated to store an array of power of 2 64-bit blocks.
+  //
+  std::shared_ptr<Buffer> buf_;
+  // Pointer to mutable data owned by Buffer
+  //
+  uint64_t* blocks_;
+};
+
+enum class ARROW_EXPORT BloomFilterBuildStrategy {

Review comment:
       Can you add a comment here to guide the user in selecting this value?  
Why would someone choose one or the other?

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();

Review comment:
       There are still some vestiges of timing that can be removed.

##########
File path: cpp/src/arrow/compute/exec/partition_util.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <random>
+#include "arrow/buffer.h"
+#include "arrow/compute/exec/util.h"
+
+namespace arrow {
+namespace compute {
+
+class PartitionSort {
+ public:
+  // Bucket sort rows on partition ids.
+  // Include in the output exclusive cummulative sum of bucket sizes.
+  // This corresponds to ranges in the sorted array containing all row ids for
+  // each of the partitions.
+  //
+  template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>
+  static void Eval(int num_rows, int num_prtns, uint16_t* prtn_ranges,
+                   INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN 
output_pos_impl) {
+    ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15));
+    ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15));
+
+    memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t));
+
+    for (int i = 0; i < num_rows; ++i) {
+      int prtn_id = static_cast<int>(prtn_id_impl(i));
+      ++prtn_ranges[prtn_id + 1];
+    }
+
+    uint16_t sum = 0;
+    for (int i = 0; i < num_prtns; ++i) {
+      uint16_t sum_next = sum + prtn_ranges[i + 1];
+      prtn_ranges[i + 1] = sum;
+      sum = sum_next;
+    }
+
+    for (int i = 0; i < num_rows; ++i) {
+      int prtn_id = static_cast<int>(prtn_id_impl(i));
+      int pos = prtn_ranges[prtn_id + 1]++;
+      output_pos_impl(i, pos);
+    }
+  }
+};
+
+class PartitionLocks {
+ public:
+  PartitionLocks();
+  ~PartitionLocks();
+  void Init(int num_prtns);
+  void CleanUp();
+  bool AcquirePartitionLock(int num_prtns, const int* prtns_to_try, bool 
limit_retries,
+                            int max_retries, int* locked_prtn_id,
+                            int* locked_prtn_id_pos);
+  void ReleasePartitionLock(int prtn_id);
+
+ private:
+  std::atomic<bool>* lock_ptr(int prtn_id);
+  int random_int(int num_values);
+
+  struct PartitionLock {
+    static constexpr int kCacheLineBytes = 64;
+    std::atomic<bool> lock;
+    uint8_t padding[kCacheLineBytes];
+  };
+  int num_prtns_;
+  PartitionLock* locks_;

Review comment:
       Even though you can't use a vector here you can still use a `unique_ptr` 
with an array.  That would allow you to get rid of the cleanup function.  Also, 
since the cleanup function is fast and called from the destructor I'm not 
entirely sure it's needed.

##########
File path: cpp/src/arrow/compute/exec/partition_util.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <random>
+#include "arrow/buffer.h"
+#include "arrow/compute/exec/util.h"
+
+namespace arrow {
+namespace compute {
+
+class PartitionSort {
+ public:
+  // Bucket sort rows on partition ids.
+  // Include in the output exclusive cummulative sum of bucket sizes.
+  // This corresponds to ranges in the sorted array containing all row ids for
+  // each of the partitions.
+  //
+  template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>

Review comment:
       I added a stab at documentation in 
https://github.com/westonpace/arrow/commit/a49ac28427e173f29982f1884f7a752ecdc69e84
   
   Feel free to use these if you want.

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd,
+                      bool enable_prefetch) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;
+  Random64Bit rnd(/*seed=*/0);
+  std::vector<uint64_t> unique_keys;
+  {
+    std::set<uint64_t> unique_keys_set;

Review comment:
       Can use `unordered_set`

##########
File path: cpp/src/arrow/compute/exec/bloom_filter_test.cc
##########
@@ -0,0 +1,484 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <algorithm>
+#include <chrono>
+#include <condition_variable>
+#include <set>
+#include <thread>
+#include "arrow/compute/exec/bloom_filter.h"
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/cpu_info.h"
+
+namespace arrow {
+namespace compute {
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, size_t num_threads,
+                        int64_t hardware_flags, MemoryPool* pool, int64_t 
num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> 
get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> 
get_hash64_impl,
+                        BlockedBloomFilter* target, float* build_cost) {
+  constexpr int batch_size_max = 32 * 1024;
+  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
+
+  auto builder = BloomFilterBuilder::Make(strategy);
+
+  std::vector<std::vector<uint32_t>> thread_local_hashes32;
+  std::vector<std::vector<uint64_t>> thread_local_hashes64;
+  thread_local_hashes32.resize(num_threads);
+  thread_local_hashes64.resize(num_threads);
+  for (size_t i = 0; i < num_threads; ++i) {
+    thread_local_hashes32[i].resize(batch_size_max);
+    thread_local_hashes64[i].resize(batch_size_max);
+  }
+
+  // Repeate the entire test in a loop multiple times in order to get 
meaningful time
+  // measurements. Time measurements in debug do not provide useful 
information and would
+  // make test take unreasonably long, so there are no repeats in debug.
+  //
+  std::vector<float> build_cost_vector;
+  int64_t num_repeats =
+      std::max(static_cast<int64_t>(1), bit_util::CeilDiv(1LL << 27, 
num_rows));
+#ifndef NDEBUG
+  num_repeats = 1LL;
+#endif
+  build_cost_vector.resize(num_repeats);
+
+  for (int64_t irepeat = 0; irepeat < num_repeats; ++irepeat) {
+    auto time0 = std::chrono::high_resolution_clock::now();
+
+    RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                                 bit_util::CeilDiv(num_rows, batch_size_max), 
target));
+
+    for (int64_t i = 0; i < num_batches; ++i) {
+      size_t thread_index = 0;
+      int batch_size = static_cast<int>(
+          std::min(num_rows - i * batch_size_max, 
static_cast<int64_t>(batch_size_max)));
+      if (target->NumHashBitsUsed() > 32) {
+        uint64_t* hashes = thread_local_hashes64[thread_index].data();
+        get_hash64_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      } else {
+        uint32_t* hashes = thread_local_hashes32[thread_index].data();
+        get_hash32_impl(i * batch_size_max, batch_size, hashes);
+        Status status = builder->PushNextBatch(thread_index, batch_size, 
hashes);
+        ARROW_DCHECK(status.ok());
+      }
+    }
+
+    auto time1 = std::chrono::high_resolution_clock::now();
+    auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time1 - 
time0).count();
+
+    builder->CleanUp();
+
+    build_cost_vector[irepeat] = static_cast<float>(ns) / 
static_cast<float>(num_rows);
+  }
+
+  std::sort(build_cost_vector.begin(), build_cost_vector.end());
+  *build_cost = build_cost_vector[build_cost_vector.size() / 2];
+
+  return Status::OK();
+}
+
+// FPR (false positives rate) - fraction of false positives relative to the sum
+// of false positives and true negatives.
+//
+// Output FPR and build and probe cost.
+//
+Status TestBloomSmall(BloomFilterBuildStrategy strategy, int64_t num_build,
+                      int num_build_copies, int dop, bool use_simd,
+                      bool enable_prefetch) {
+  int64_t hardware_flags = use_simd ? ::arrow::internal::CpuInfo::AVX2 : 0;
+
+  // Generate input keys
+  //
+  int64_t num_probe = 4 * num_build;

Review comment:
       When removing the benchmarking it appears you removed all probing.  We 
could still do some probing (and verify the FPR is within some threshold) or we 
could remove probing entirely (e.g. don't generate a bunch of probe data if we 
aren't going to use it).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to