This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8737123019 ARROW-16713: [C++] Pull join accumulation outside of 
HashJoinImpl (#13332)
8737123019 is described below

commit 8737123019ecb49e1670ea7b94b02404dc3709b5
Author: Sasha Krassovsky <[email protected]>
AuthorDate: Thu Jun 16 18:27:01 2022 -0400

    ARROW-16713: [C++] Pull join accumulation outside of HashJoinImpl (#13332)
    
    Authored-by: Sasha Krassovsky <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                      |   1 +
 cpp/src/arrow/compute/exec/accumulation_queue.cc  |  58 ++
 cpp/src/arrow/compute/exec/accumulation_queue.h   |  57 ++
 cpp/src/arrow/compute/exec/hash_join.cc           | 453 ++------------
 cpp/src/arrow/compute/exec/hash_join.h            |  27 +-
 cpp/src/arrow/compute/exec/hash_join_benchmark.cc | 111 ++--
 cpp/src/arrow/compute/exec/hash_join_node.cc      | 691 ++++++++++++++++++----
 7 files changed, 774 insertions(+), 624 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index ccf643bbc5..5d547b8eab 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -385,6 +385,7 @@ if(ARROW_COMPUTE)
        compute/cast.cc
        compute/exec.cc
        compute/exec/aggregate.cc
+       compute/exec/accumulation_queue.cc
        compute/exec/aggregate_node.cc
        compute/exec/bloom_filter.cc
        compute/exec/exec_plan.cc
diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc 
b/cpp/src/arrow/compute/exec/accumulation_queue.cc
new file mode 100644
index 0000000000..192db52942
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/accumulation_queue.h"
+
+#include <iterator>
+
+namespace arrow {
+namespace util {
+using arrow::compute::ExecBatch;
+AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
+  this->batches_ = std::move(that.batches_);
+  this->row_count_ = that.row_count_;
+  that.Clear();
+}
+
+AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) {
+  this->batches_ = std::move(that.batches_);
+  this->row_count_ = that.row_count_;
+  that.Clear();
+  return *this;
+}
+
+void AccumulationQueue::Concatenate(AccumulationQueue&& that) {
+  this->batches_.reserve(this->batches_.size() + that.batches_.size());
+  std::move(that.batches_.begin(), that.batches_.end(),
+            std::back_inserter(this->batches_));
+  this->row_count_ += that.row_count_;
+  that.Clear();
+}
+
+void AccumulationQueue::InsertBatch(ExecBatch batch) {
+  row_count_ += batch.length;
+  batches_.emplace_back(std::move(batch));
+}
+
+void AccumulationQueue::Clear() {
+  row_count_ = 0;
+  batches_.clear();
+}
+
+ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
+}  // namespace util
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h 
b/cpp/src/arrow/compute/exec/accumulation_queue.h
new file mode 100644
index 0000000000..4b23e5ffca
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/accumulation_queue.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/compute/exec.h"
+
+namespace arrow {
+namespace util {
+using arrow::compute::ExecBatch;
+
+/// \brief A container that accumulates batches until they are ready to
+///        be processed.
+class AccumulationQueue {
+ public:
+  AccumulationQueue() : row_count_(0) {}
+  ~AccumulationQueue() = default;
+
+  // We should never be copying ExecBatch around
+  AccumulationQueue(const AccumulationQueue&) = delete;
+  AccumulationQueue& operator=(const AccumulationQueue&) = delete;
+
+  AccumulationQueue(AccumulationQueue&& that);
+  AccumulationQueue& operator=(AccumulationQueue&& that);
+
+  void Concatenate(AccumulationQueue&& that);
+  void InsertBatch(ExecBatch batch);
+  int64_t row_count() { return row_count_; }
+  size_t batch_count() { return batches_.size(); }
+  bool empty() const { return batches_.empty(); }
+  void Clear();
+  ExecBatch& operator[](size_t i);
+
+ private:
+  int64_t row_count_;
+  std::vector<ExecBatch> batches_;
+};
+
+}  // namespace util
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc 
b/cpp/src/arrow/compute/exec/hash_join.cc
index 9c124674d7..1ebe11e704 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -40,70 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
   struct ThreadLocalState;
 
  public:
-  HashJoinBasicImpl() : num_expected_bloom_filters_(0) {}
-
-  Status InputReceived(size_t thread_index, int side, ExecBatch batch) 
override {
-    if (cancelled_) {
-      return Status::Cancelled("Hash join cancelled");
-    }
-    EVENT(span_, "InputReceived");
-
-    ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side, 
batch));
-    if (queued) {
-      return Status::OK();
-    } else {
-      ARROW_DCHECK(side == 0);
-      return ProbeBatch(thread_index, batch);
-    }
-  }
-
-  Status InputFinished(size_t thread_index, int side) override {
-    if (cancelled_) {
-      return Status::Cancelled("Hash join cancelled");
-    }
-    EVENT(span_, "InputFinished", {{"side", side}});
-    if (side == 0) {
-      bool proceed;
-      {
-        std::lock_guard<std::mutex> lock(finished_mutex_);
-        proceed = !left_side_finished_ && left_queue_probe_finished_;
-        left_side_finished_ = true;
-      }
-      if (proceed) {
-        RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
-      }
-    } else {
-      bool proceed;
-      {
-        std::lock_guard<std::mutex> lock(finished_mutex_);
-        proceed = !right_side_finished_;
-        right_side_finished_ = true;
-      }
-      if (proceed) {
-        RETURN_NOT_OK(OnRightSideFinished(thread_index));
-      }
-    }
-    return Status::OK();
-  }
-
-  Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
-              size_t /*num_threads*/, HashJoinSchema* schema_mgr,
-              std::vector<JoinKeyCmp> key_cmp, Expression filter,
-              OutputBatchCallback output_batch_callback,
-              FinishedCallback finished_callback,
-              TaskScheduler::ScheduleImpl schedule_task_callback,
-              HashJoinImpl* pushdown_target, std::vector<int> column_map) 
override {
-    // TODO(ARROW-15732)
-    // Each side of join might have an IO thread being called from.
-    // As of right now, we ignore the `num_threads` argument, so later we will 
have to
-    // re-add `num_threads_ = num_threads;`
-    num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() 
+ 1;
-
+  Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads,
+              HashJoinSchema* schema_mgr, std::vector<JoinKeyCmp> key_cmp,
+              Expression filter, OutputBatchCallback output_batch_callback,
+              FinishedCallback finished_callback, TaskScheduler* scheduler) 
override {
     START_COMPUTE_SPAN(span_, "HashJoinBasicImpl",
                        {{"detail", filter.ToString()},
                         {"join.kind", ToString(join_type)},
-                        {"join.threads", 
static_cast<uint32_t>(num_threads_)}});
+                        {"join.threads", static_cast<uint32_t>(num_threads)}});
 
+    num_threads_ = num_threads;
     ctx_ = ctx;
     join_type_ = join_type;
     schema_mgr_ = schema_mgr;
@@ -111,6 +57,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
     filter_ = std::move(filter);
     output_batch_callback_ = std::move(output_batch_callback);
     finished_callback_ = std::move(finished_callback);
+    scheduler_ = scheduler;
     local_states_.resize(num_threads_);
     for (size_t i = 0; i < local_states_.size(); ++i) {
       local_states_[i].is_initialized = false;
@@ -119,39 +66,12 @@ class HashJoinBasicImpl : public HashJoinImpl {
 
     dict_probe_.Init(num_threads_);
 
-    pushdown_target_ = pushdown_target;
-    column_map_ = std::move(column_map);
-    if (pushdown_target_) pushdown_target_->ExpectBloomFilter();
-
-    right_input_row_count_ = 0;
     has_hash_table_ = false;
     num_batches_produced_.store(0);
     cancelled_ = false;
-    right_side_finished_ = false;
-    left_side_finished_ = false;
-    bloom_filters_ready_ = false;
-    left_queue_bloom_finished_ = false;
-    left_queue_probe_finished_ = false;
-
-    scheduler_ = TaskScheduler::Make();
-    if (pushdown_target_) {
-      bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
-      bloom_filter_builder_ = BloomFilterBuilder::Make(
-          use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
-                             : BloomFilterBuildStrategy::PARALLEL);
-    }
 
-    RegisterBuildBloomFilter();
     RegisterBuildHashTable();
-    RegisterBloomFilterQueuedBatches();
-    RegisterProbeQueuedBatches();
     RegisterScanHashTable();
-    scheduler_->RegisterEnd();
-
-    RETURN_NOT_OK(scheduler_->StartScheduling(
-        0 /*thread index*/, std::move(schedule_task_callback),
-        static_cast<int>(2 * num_threads_) /*concurrent tasks*/, 
use_sync_execution));
-
     return Status::OK();
   }
 
@@ -162,32 +82,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
     scheduler_->Abort(std::move(pos_abort_callback));
   }
 
-  // Called by a downstream node after they have constructed a bloom filter
-  // that this node can use to filter inputs.
-  Status PushBloomFilter(size_t thread_index, 
std::unique_ptr<BlockedBloomFilter> filter,
-                         std::vector<int> column_map) override {
-    bool proceed;
-    {
-      std::lock_guard<std::mutex> lock_bloom(bloom_filters_mutex_);
-      pushed_bloom_filters_.emplace_back(std::move(filter));
-      bloom_filter_column_maps_.emplace_back(std::move(column_map));
-      proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_;
-      ARROW_DCHECK(pushed_bloom_filters_.size() <= 
num_expected_bloom_filters_);
-    }
-    if (proceed) {
-      size_t num_batches;
-      {
-        std::lock_guard<std::mutex> lock(left_batches_mutex_);
-        num_batches = left_batches_.size();
-        bloom_filters_ready_ = true;
-      }
-      RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches));
-    }
-    return Status::OK();
-  }
-
-  void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; }
-
  private:
   void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder* 
encoder) {
     std::vector<ValueDescr> data_types;
@@ -212,8 +106,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
       if (has_payload) {
         InitEncoder(0, HashJoinProjection::PAYLOAD, 
&local_state.exec_batch_payloads);
       }
-      RETURN_NOT_OK(local_state.temp_stack.Init(
-          ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * 
sizeof(uint32_t)));
       local_state.is_initialized = true;
     }
     return Status::OK();
@@ -594,7 +486,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
     }
   }
 
-  Status ProbeBatch(size_t thread_index, const ExecBatch& batch) {
+  Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) override {
     ThreadLocalState& local_state = local_states_[thread_index];
     RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
 
@@ -656,96 +548,18 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
-  Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) {
-    if (batch.length == 0) return Status::OK();
-    int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
-    std::vector<uint8_t> selected(bit_vector_bytes);
-    std::vector<uint32_t> hashes(batch.length);
-    std::vector<uint8_t> bv(bit_vector_bytes);
-
-    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
-    // Start with full selection for the current batch
-    memset(selected.data(), 0xff, bit_vector_bytes);
-    for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) 
{
-      std::vector<Datum> keys(bloom_filter_column_maps_[ifilter].size());
-      for (size_t i = 0; i < keys.size(); i++) {
-        int input_idx = bloom_filter_column_maps_[ifilter][i];
-        keys[i] = batch[input_idx];
-        if (keys[i].is_scalar()) {
-          ARROW_ASSIGN_OR_RAISE(
-              keys[i],
-              MakeArrayFromScalar(*keys[i].scalar(), batch.length, 
ctx_->memory_pool()));
-        }
-      }
-      ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, 
ExecBatch::Make(std::move(keys)));
-      RETURN_NOT_OK(Hashing32::HashBatch(
-          key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(),
-          &local_states_[thread_index].temp_stack, 0, key_batch.length));
-
-      pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
-                                           key_batch.length, hashes.data(), 
bv.data());
-      arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, 
key_batch.length, 0,
-                                 selected.data());
-    }
-    auto selected_buffer =
-        arrow::internal::make_unique<Buffer>(selected.data(), 
bit_vector_bytes);
-    ArrayData selected_arraydata(boolean(), batch.length,
-                                 {nullptr, std::move(selected_buffer)});
-    Datum selected_datum(selected_arraydata);
-    FilterOptions options;
-    size_t first_nonscalar = batch.values.size();
-    for (size_t i = 0; i < batch.values.size(); i++) {
-      if (!batch.values[i].is_scalar()) {
-        ARROW_ASSIGN_OR_RAISE(batch.values[i],
-                              Filter(batch.values[i], selected_datum, options, 
ctx_));
-        first_nonscalar = std::min(first_nonscalar, i);
-        ARROW_DCHECK_EQ(batch.values[i].length(), 
batch.values[first_nonscalar].length());
-      }
-    }
-    // If they're all Scalar, then the length of the batch is the number of 
set bits
-    if (first_nonscalar == batch.values.size())
-      batch.length = arrow::internal::CountSetBits(selected.data(), 0, 
batch.length);
-    else
-      batch.length = batch.values[first_nonscalar].length();
-    return Status::OK();
-  }
-
-  int64_t BuildHashTable_num_tasks() { return 1; }
-
-  Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) {
-    const ExecBatch& input_batch = right_batches_[task_id];
-    SchemaProjectionMap key_to_in =
-        schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, 
HashJoinProjection::INPUT);
-    std::vector<Datum> key_columns(key_to_in.num_cols);
-    for (size_t i = 0; i < key_columns.size(); i++) {
-      int input_idx = key_to_in.get(static_cast<int>(i));
-      key_columns[i] = input_batch[input_idx];
-      if (key_columns[i].is_scalar()) {
-        ARROW_ASSIGN_OR_RAISE(
-            key_columns[i], MakeArrayFromScalar(*key_columns[i].scalar(),
-                                                input_batch.length, 
ctx_->memory_pool()));
-      }
-    }
-    ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, 
ExecBatch::Make(std::move(key_columns)));
-
-    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
-    ThreadLocalState& tls = local_states_[thread_index];
-    util::TempVectorHolder<uint32_t> hash_holder(&tls.temp_stack,
-                                                 
util::MiniBatch::kMiniBatchLength);
-    uint32_t* hashes = hash_holder.mutable_data();
-    for (int64_t i = 0; i < key_batch.length; i += 
util::MiniBatch::kMiniBatchLength) {
-      int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
-                                
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
-      RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes,
-                                         ctx_->cpu_info()->hardware_flags(),
-                                         &tls.temp_stack, i, length));
-      RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length, 
hashes));
-    }
-    return Status::OK();
+  void RegisterBuildHashTable() {
+    task_group_build_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) -> Status {
+          return BuildHashTable_exec_task(thread_index, task_id);
+        },
+        [this](size_t thread_index) -> Status {
+          return BuildHashTable_on_finished(thread_index);
+        });
   }
 
   Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
-    const std::vector<ExecBatch>& batches = right_batches_;
+    AccumulationQueue batches = std::move(build_batches_);
     if (batches.empty()) {
       hash_table_empty_ = true;
     } else {
@@ -756,7 +570,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
         InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
       }
       hash_table_empty_ = true;
-      for (size_t ibatch = 0; ibatch < batches.size(); ++ibatch) {
+      for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
         if (cancelled_) {
           return Status::Cancelled("Hash join cancelled");
         }
@@ -789,159 +603,30 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
-  Status BuildBloomFilter_on_finished(size_t thread_index) {
-    if (cancelled_) return Status::Cancelled("Hash join cancelled");
-    ARROW_DCHECK(pushdown_target_);
-    RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
-        thread_index, std::move(bloom_filter_), std::move(column_map_)));
-    return BuildHashTable(thread_index);
-  }
-
   Status BuildHashTable_on_finished(size_t thread_index) {
-    if (cancelled_) {
-      return Status::Cancelled("Hash join cancelled");
-    }
-
-    right_batches_.clear();
-
-    bool proceed;
-    {
-      std::lock_guard<std::mutex> lock(left_batches_mutex_);
-      std::lock_guard<std::mutex> lock_finish(finished_mutex_);
-      left_queue_bloom_finished_ =
-          left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
-      proceed = !has_hash_table_ && left_queue_bloom_finished_;
-      has_hash_table_ = true;
-    }
-    if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
-
-    return Status::OK();
-  }
-
-  void RegisterBuildBloomFilter() {
-    task_group_bloom_ = scheduler_->RegisterTaskGroup(
-        [this](size_t thread_index, int64_t task_id) -> Status {
-          return BuildBloomFilter_exec_task(thread_index, task_id);
-        },
-        [this](size_t thread_index) -> Status {
-          return BuildBloomFilter_on_finished(thread_index);
-        });
-  }
-
-  void RegisterBuildHashTable() {
-    task_group_build_ = scheduler_->RegisterTaskGroup(
-        [this](size_t thread_index, int64_t task_id) -> Status {
-          return BuildHashTable_exec_task(thread_index, task_id);
-        },
-        [this](size_t thread_index) -> Status {
-          return BuildHashTable_on_finished(thread_index);
-        });
+    ARROW_DCHECK_EQ(build_batches_.batch_count(), 0);
+    has_hash_table_ = true;
+    return build_finished_callback_(thread_index);
   }
 
-  Status BuildBloomFilter(size_t thread_index) {
-    RETURN_NOT_OK(bloom_filter_builder_->Begin(
-        num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(),
-        right_input_row_count_, right_batches_.size(), bloom_filter_.get()));
-
-    return scheduler_->StartTaskGroup(thread_index, task_group_bloom_,
-                                      right_batches_.size());
-  }
-
-  Status BuildHashTable(size_t thread_index) {
+  Status BuildHashTable(size_t thread_index, AccumulationQueue batches,
+                        BuildFinishedCallback on_finished) override {
+    build_finished_callback_ = std::move(on_finished);
+    build_batches_ = std::move(batches);
     return scheduler_->StartTaskGroup(thread_index, task_group_build_,
-                                      BuildHashTable_num_tasks());
-  }
-
-  Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t 
task_id) {
-    if (cancelled_) return Status::Cancelled("Hash join cancelled");
-    ExecBatch batch;
-    {
-      std::lock_guard<std::mutex> lock(left_batches_mutex_);
-      batch = std::move(left_batches_[task_id]);
-      ARROW_DCHECK(!batch.values.empty());
-    }
-    RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
-    {
-      std::lock_guard<std::mutex> lock(left_batches_mutex_);
-      left_batches_[task_id] = std::move(batch);
-    }
-    return Status::OK();
-  }
-
-  Status BloomFilterQueuedBatches_on_finished(size_t thread_index) {
-    if (cancelled_) return Status::Cancelled("Hash join cancelled");
-    bool proceed;
-    {
-      std::lock_guard<std::mutex> lock(finished_mutex_);
-      proceed = !left_queue_bloom_finished_ && has_hash_table_;
-      left_queue_bloom_finished_ = true;
-    }
-    if (proceed) return ProbeQueuedBatches(thread_index);
-    return Status::OK();
-  }
-
-  void RegisterBloomFilterQueuedBatches() {
-    task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup(
-        [this](size_t thread_index, int64_t task_id) -> Status {
-          return BloomFilterQueuedBatches_exec_task(thread_index, task_id);
-        },
-        [this](size_t thread_index) -> Status {
-          return BloomFilterQueuedBatches_on_finished(thread_index);
-        });
-  }
-
-  Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) {
-    return scheduler_->StartTaskGroup(thread_index, 
task_group_bloom_filter_queued_,
-                                      num_batches);
-  }
-
-  int64_t ProbeQueuedBatches_num_tasks() {
-    return static_cast<int64_t>(left_batches_.size());
-  }
-
-  Status ProbeQueuedBatches_exec_task(size_t thread_index, int64_t task_id) {
-    if (cancelled_) {
-      return Status::Cancelled("Hash join cancelled");
-    }
-    return ProbeBatch(thread_index, std::move(left_batches_[task_id]));
-  }
-
-  Status ProbeQueuedBatches_on_finished(size_t thread_index) {
-    if (cancelled_) {
-      return Status::Cancelled("Hash join cancelled");
-    }
-
-    left_batches_.clear();
-
-    bool proceed;
-    {
-      std::lock_guard<std::mutex> lock(finished_mutex_);
-      ARROW_DCHECK(left_queue_bloom_finished_);
-      proceed = left_side_finished_ && !left_queue_probe_finished_;
-      left_queue_probe_finished_ = true;
-    }
-    if (proceed) {
-      RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
-    }
-
-    return Status::OK();
+                                      /*num_tasks=*/1);
   }
 
-  void RegisterProbeQueuedBatches() {
-    task_group_queued_ = scheduler_->RegisterTaskGroup(
+  void RegisterScanHashTable() {
+    task_group_scan_ = scheduler_->RegisterTaskGroup(
         [this](size_t thread_index, int64_t task_id) -> Status {
-          return ProbeQueuedBatches_exec_task(thread_index, task_id);
+          return ScanHashTable_exec_task(thread_index, task_id);
         },
         [this](size_t thread_index) -> Status {
-          return ProbeQueuedBatches_on_finished(thread_index);
+          return ScanHashTable_on_finished(thread_index);
         });
   }
 
-  Status ProbeQueuedBatches(size_t thread_index) {
-    return scheduler_->StartTaskGroup(thread_index, task_group_queued_,
-                                      ProbeQueuedBatches_num_tasks());
-  }
-
   int64_t ScanHashTable_num_tasks() {
     if (!has_hash_table_ || hash_table_empty_) {
       return 0;
@@ -1006,58 +691,13 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
-  void RegisterScanHashTable() {
-    task_group_scan_ = scheduler_->RegisterTaskGroup(
-        [this](size_t thread_index, int64_t task_id) -> Status {
-          return ScanHashTable_exec_task(thread_index, task_id);
-        },
-        [this](size_t thread_index) -> Status {
-          return ScanHashTable_on_finished(thread_index);
-        });
-  }
-
   Status ScanHashTable(size_t thread_index) {
     MergeHasMatch();
     return scheduler_->StartTaskGroup(thread_index, task_group_scan_,
                                       ScanHashTable_num_tasks());
   }
 
-  Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& 
batch) {
-    if (side == 0) {
-      // We don't want to do the filtering while holding the lock, since that 
can get
-      // expensive.
-      bool needs_filtering;
-      {
-        std::lock_guard<std::mutex> lock(left_batches_mutex_);
-        bloom_filters_ready_ = bloom_filters_ready_ || 
num_expected_bloom_filters_ == 0;
-        needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ 
!= 0;
-      }
-      if (needs_filtering) 
RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
-
-      bool queued;
-      {
-        std::lock_guard<std::mutex> lock(left_batches_mutex_);
-        queued = !bloom_filters_ready_ || !has_hash_table_;
-        if (queued) left_batches_.emplace_back(std::move(batch));
-      }
-      return queued;
-    } else {
-      std::lock_guard<std::mutex> lock(right_batches_mutex_);
-      right_input_row_count_ += batch.length;
-      right_batches_.emplace_back(std::move(batch));
-      return true;
-    }
-  }
-
-  Status OnRightSideFinished(size_t thread_index) {
-    if (pushdown_target_ == nullptr) {
-      return BuildHashTable(thread_index);
-    } else {
-      return BuildBloomFilter(thread_index);
-    }
-  }
-
-  Status OnLeftSideAndQueueFinished(size_t thread_index) {
+  Status ProbingFinished(size_t thread_index) override {
     return ScanHashTable(thread_index);
   }
 
@@ -1105,16 +745,14 @@ class HashJoinBasicImpl : public HashJoinImpl {
   HashJoinSchema* schema_mgr_;
   std::vector<JoinKeyCmp> key_cmp_;
   Expression filter_;
-  std::unique_ptr<TaskScheduler> scheduler_;
-  int task_group_bloom_;
+  TaskScheduler* scheduler_;
   int task_group_build_;
-  int task_group_bloom_filter_queued_;
-  int task_group_queued_;
   int task_group_scan_;
 
   // Callbacks
   //
   OutputBatchCallback output_batch_callback_;
+  BuildFinishedCallback build_finished_callback_;
   FinishedCallback finished_callback_;
 
   // Thread local runtime state
@@ -1129,7 +767,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
     std::vector<int32_t> match_right;
     bool is_has_match_initialized;
     std::vector<uint8_t> has_match;
-    util::TempVectorStack temp_stack;
   };
   std::vector<ThreadLocalState> local_states_;
 
@@ -1146,34 +783,12 @@ class HashJoinBasicImpl : public HashJoinImpl {
   HashJoinDictBuildMulti dict_build_;
   HashJoinDictProbeMulti dict_probe_;
 
-  std::vector<ExecBatch> left_batches_;
   bool has_hash_table_;
-  std::mutex left_batches_mutex_;
-
-  size_t right_input_row_count_;  // Sum of the lengths of ExecBatches in 
right_batches_
-  std::vector<ExecBatch> right_batches_;
-  std::mutex right_batches_mutex_;
 
-  // Bloom filter stuff
-  //
-  std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_;
-  std::unique_ptr<BlockedBloomFilter> bloom_filter_;
-  std::vector<int> column_map_;
-  std::vector<std::unique_ptr<BlockedBloomFilter>> pushed_bloom_filters_;
-  std::vector<std::vector<int>> bloom_filter_column_maps_;
-  std::mutex bloom_filters_mutex_;
-  size_t num_expected_bloom_filters_;
-  HashJoinImpl* pushdown_target_;
+  AccumulationQueue build_batches_;
 
   std::atomic<int64_t> num_batches_produced_;
   bool cancelled_;
-
-  bool bloom_filters_ready_;
-  bool right_side_finished_;
-  bool left_side_finished_;
-  bool left_queue_bloom_finished_;
-  bool left_queue_probe_finished_;
-  std::mutex finished_mutex_;
 };
 
 Result<std::unique_ptr<HashJoinImpl>> HashJoinImpl::MakeBasic() {
diff --git a/cpp/src/arrow/compute/exec/hash_join.h 
b/cpp/src/arrow/compute/exec/hash_join.h
index 2b3bef0ead..97bdf166a0 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/compute/exec/accumulation_queue.h"
 #include "arrow/compute/exec/bloom_filter.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/schema_util.h"
@@ -33,6 +34,8 @@
 namespace arrow {
 namespace compute {
 
+using arrow::util::AccumulationQueue;
+
 class ARROW_EXPORT HashJoinSchema {
  public:
   Status Init(JoinType join_type, const Schema& left_schema,
@@ -100,22 +103,20 @@ class ARROW_EXPORT HashJoinSchema {
 class HashJoinImpl {
  public:
   using OutputBatchCallback = std::function<void(ExecBatch)>;
+  using BuildFinishedCallback = std::function<Status(size_t)>;
+  using ProbeFinishedCallback = std::function<Status(size_t)>;
   using FinishedCallback = std::function<void(int64_t)>;
 
   virtual ~HashJoinImpl() = default;
-  virtual Status Init(ExecContext* ctx, JoinType join_type, bool 
use_sync_execution,
-                      size_t num_threads, HashJoinSchema* schema_mgr,
-                      std::vector<JoinKeyCmp> key_cmp, Expression filter,
-                      OutputBatchCallback output_batch_callback,
-                      FinishedCallback finished_callback,
-                      TaskScheduler::ScheduleImpl schedule_task_callback,
-                      HashJoinImpl* pushdown_target, std::vector<int> 
column_map) = 0;
-  virtual void ExpectBloomFilter() = 0;
-  virtual Status PushBloomFilter(size_t thread_index,
-                                 std::unique_ptr<BlockedBloomFilter> filter,
-                                 std::vector<int> column_map) = 0;
-  virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch) 
= 0;
-  virtual Status InputFinished(size_t thread_index, int side) = 0;
+  virtual Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads,
+                      HashJoinSchema* schema_mgr, std::vector<JoinKeyCmp> 
key_cmp,
+                      Expression filter, OutputBatchCallback 
output_batch_callback,
+                      FinishedCallback finished_callback, TaskScheduler* 
scheduler) = 0;
+
+  virtual Status BuildHashTable(size_t thread_index, AccumulationQueue batches,
+                                BuildFinishedCallback on_finished) = 0;
+  virtual Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) = 0;
+  virtual Status ProbingFinished(size_t thread_index) = 0;
   virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) 
= 0;
 
   static Result<std::unique_ptr<HashJoinImpl>> MakeBasic();
diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc 
b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
index 8d8be7f904..0786071f99 100644
--- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
@@ -36,7 +36,6 @@
 namespace arrow {
 namespace compute {
 struct BenchmarkSettings {
-  bool bloom_filter = false;
   int num_threads = 1;
   JoinType join_type = JoinType::INNER;
   int batch_size = 1024;
@@ -111,11 +110,16 @@ class JoinBenchmark {
     auto l_schema = *l_schema_builder.Finish();
     auto r_schema = *r_schema_builder.Finish();
 
-    l_batches_ =
+    BatchesWithSchema l_batches_with_schema =
         MakeRandomBatches(l_schema, settings.num_probe_batches, 
settings.batch_size);
-    r_batches_ =
+    BatchesWithSchema r_batches_with_schema =
         MakeRandomBatches(r_schema, settings.num_build_batches, 
settings.batch_size);
 
+    for (ExecBatch& batch : l_batches_with_schema.batches)
+      l_batches_.InsertBatch(std::move(batch));
+    for (ExecBatch& batch : r_batches_with_schema.batches)
+      r_batches_.InsertBatch(std::move(batch));
+
     stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;
 
     ctx_ = arrow::internal::make_unique<ExecContext>(
@@ -124,27 +128,12 @@ class JoinBenchmark {
 
     schema_mgr_ = arrow::internal::make_unique<HashJoinSchema>();
     Expression filter = literal(true);
-    DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_.schema, 
left_keys,
-                                *r_batches_.schema, right_keys, filter, "l_", 
"r_"));
+    DCHECK_OK(schema_mgr_->Init(settings.join_type, 
*l_batches_with_schema.schema,
+                                left_keys, *r_batches_with_schema.schema, 
right_keys,
+                                filter, "l_", "r_"));
 
     join_ = *HashJoinImpl::MakeBasic();
 
-    HashJoinImpl* bloom_filter_pushdown_target = nullptr;
-    std::vector<int> key_input_map;
-
-    bool bloom_filter_does_not_apply_to_join =
-        settings.join_type == JoinType::LEFT_ANTI ||
-        settings.join_type == JoinType::LEFT_OUTER ||
-        settings.join_type == JoinType::FULL_OUTER;
-    if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) {
-      bloom_filter_pushdown_target = join_.get();
-      SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map(
-          HashJoinProjection::KEY, HashJoinProjection::INPUT);
-      int num_keys = probe_key_to_input.num_cols;
-      for (int i = 0; i < num_keys; i++)
-        key_input_map.push_back(probe_key_to_input.get(i));
-    }
-
     omp_set_num_threads(settings.num_threads);
     auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
 #pragma omp task
@@ -152,39 +141,47 @@ class JoinBenchmark {
       return Status::OK();
     };
 
+    scheduler_ = TaskScheduler::Make();
     DCHECK_OK(join_->Init(
-        ctx_.get(), settings.join_type, !is_parallel, settings.num_threads,
-        schema_mgr_.get(), std::move(key_cmp), std::move(filter), 
[](ExecBatch) {},
-        [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target,
-        std::move(key_input_map)));
+        ctx_.get(), settings.join_type, settings.num_threads, 
schema_mgr_.get(),
+        std::move(key_cmp), std::move(filter), [](ExecBatch) {}, [](int64_t x) 
{},
+        scheduler_.get()));
+
+    task_group_probe_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) -> Status {
+          return join_->ProbeSingleBatch(thread_index, 
std::move(l_batches_[task_id]));
+        },
+        [this](size_t thread_index) -> Status {
+          return join_->ProbingFinished(thread_index);
+        });
+
+    scheduler_->RegisterEnd();
+
+    DCHECK_OK(scheduler_->StartScheduling(
+        0 /*thread index*/, std::move(schedule_callback),
+        static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/, 
!is_parallel));
   }
 
   void RunJoin() {
 #pragma omp parallel
     {
       int tid = omp_get_thread_num();
-#pragma omp for nowait
-      for (auto it = r_batches_.batches.begin(); it != 
r_batches_.batches.end(); ++it)
-        DCHECK_OK(join_->InputReceived(tid, /*side=*/1, *it));
-#pragma omp for nowait
-      for (auto it = l_batches_.batches.begin(); it != 
l_batches_.batches.end(); ++it)
-        DCHECK_OK(join_->InputReceived(tid, /*side=*/0, *it));
-
-#pragma omp barrier
-
-#pragma omp single nowait
-      { DCHECK_OK(join_->InputFinished(tid, /*side=*/1)); }
-
-#pragma omp single nowait
-      { DCHECK_OK(join_->InputFinished(tid, /*side=*/0)); }
+#pragma omp single
+      DCHECK_OK(
+          join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t 
thread_index) {
+            return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+                                              l_batches_.batch_count());
+          }));
     }
   }
 
-  BatchesWithSchema l_batches_;
-  BatchesWithSchema r_batches_;
+  std::unique_ptr<TaskScheduler> scheduler_;
+  AccumulationQueue l_batches_;
+  AccumulationQueue r_batches_;
   std::unique_ptr<HashJoinSchema> schema_mgr_;
   std::unique_ptr<HashJoinImpl> join_;
   std::unique_ptr<ExecContext> ctx_;
+  int task_group_probe_;
 
   struct {
     uint64_t num_probe_rows;
@@ -193,11 +190,17 @@ class JoinBenchmark {
 
 static void HashJoinBasicBenchmarkImpl(benchmark::State& st,
                                        BenchmarkSettings& settings) {
-  JoinBenchmark bm(settings);
   uint64_t total_rows = 0;
   for (auto _ : st) {
-    bm.RunJoin();
-    total_rows += bm.stats_.num_probe_rows;
+    st.PauseTiming();
+    {
+      JoinBenchmark bm(settings);
+      st.ResumeTiming();
+      bm.RunJoin();
+      st.PauseTiming();
+      total_rows += bm.stats_.num_probe_rows;
+    }
+    st.ResumeTiming();
   }
   st.counters["rows/sec"] = benchmark::Counter(total_rows, 
benchmark::Counter::kIsRate);
 }
@@ -288,16 +291,6 @@ static void 
BM_HashJoinBasic_NullPercentage(benchmark::State& st) {
 
   HashJoinBasicBenchmarkImpl(st, settings);
 }
-
-static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool 
bloom_filter) {
-  BenchmarkSettings settings;
-  settings.bloom_filter = bloom_filter;
-  settings.selectivity = static_cast<double>(st.range(0)) / 100.0;
-  settings.num_build_batches = static_cast<int>(st.range(1));
-  settings.num_probe_batches = settings.num_build_batches;
-
-  HashJoinBasicBenchmarkImpl(st, settings);
-}
 #endif
 
 std::vector<int64_t> hashtable_krows = benchmark::CreateRange(1, 4096, 8);
@@ -425,16 +418,6 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism)
 BENCHMARK(BM_HashJoinBasic_NullPercentage)
     ->ArgNames({"Null Percentage"})
     ->DenseRange(0, 100, 10);
-
-std::vector<std::string> bloomfilter_argnames = {"Selectivity", "HashTable 
krows"};
-std::vector<std::vector<int64_t>> bloomfilter_args = {
-    benchmark::CreateDenseRange(0, 100, 10), hashtable_krows};
-BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true)
-    ->ArgNames(bloomfilter_argnames)
-    ->ArgsProduct(selectivity_args);
-BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false)
-    ->ArgNames(bloomfilter_argnames)
-    ->ArgsProduct(selectivity_args);
 #else
 
 BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc 
b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 60ad75228a..baa8267125 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <mutex>
 #include <unordered_set>
 #include <utility>
 
 #include "arrow/compute/exec/exec_plan.h"
 #include "arrow/compute/exec/hash_join.h"
 #include "arrow/compute/exec/hash_join_dict.h"
+#include "arrow/compute/exec/key_hash.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/schema_util.h"
 #include "arrow/compute/exec/util.h"
@@ -470,6 +472,189 @@ Status ValidateHashJoinNodeOptions(const 
HashJoinNodeOptions& join_options) {
   return Status::OK();
 }
 
+class HashJoinNode;
+
+// This is a struct encapsulating things related to Bloom filters and pushing 
them around
+// between HashJoinNodes. The general strategy is to notify other joins at 
plan-creation
+// time for that join to expect a Bloom filter. Once the full build side has 
been
+// accumulated for a given join, it will build the Bloom filter and push it to 
its
+// pushdown target. Once a join has received all of its Bloom filters, it will 
evaluate it
+// on every batch that has been queued so far as well as any new probe-side 
batch that
+// comes in.
+struct BloomFilterPushdownContext {
+  using BuildFinishedCallback = std::function<Status(size_t, 
AccumulationQueue)>;
+  using FiltersReceivedCallback = std::function<Status()>;
+  using FilterFinishedCallback = std::function<Status(size_t, 
AccumulationQueue)>;
+  void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+            FiltersReceivedCallback on_bloom_filters_received, bool 
disable_bloom_filter,
+            bool use_sync_execution);
+
+  Status StartProducing();
+
+  void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+  // Builds the Bloom filter, taking ownership of the batches until the build
+  // is done.
+  Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+                          BuildFinishedCallback on_finished);
+
+  // Sends the Bloom filter to the pushdown target.
+  Status PushBloomFilter();
+
+  // Receives a Bloom filter and its associated column map.
+  Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+                            std::vector<int> column_map) {
+    bool proceed;
+    {
+      std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+      eval_.received_filters_.emplace_back(std::move(filter));
+      eval_.received_maps_.emplace_back(std::move(column_map));
+      proceed = eval_.num_expected_bloom_filters_ == 
eval_.received_filters_.size();
+
+      ARROW_DCHECK_EQ(eval_.received_filters_.size(), 
eval_.received_maps_.size());
+      ARROW_DCHECK_LE(eval_.received_filters_.size(), 
eval_.num_expected_bloom_filters_);
+    }
+    if (proceed) {
+      return eval_.all_received_callback_();
+    }
+    return Status::OK();
+  }
+
+  // Evaluates the Bloom filter on a group of batches, taking ownership of them
+  // until the whole filtering process is complete.
+  Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+                       FilterFinishedCallback on_finished) {
+    eval_.batches_ = std::move(batches);
+    eval_.on_finished_ = std::move(on_finished);
+
+    if (eval_.num_expected_bloom_filters_ == 0)
+      return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+
+    return scheduler_->StartTaskGroup(thread_index, eval_.task_id_,
+                                      
/*num_tasks=*/eval_.batches_.batch_count());
+  }
+
+  // Applies all Bloom filters on the input batch.
+  Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) {
+    ExecBatch& batch = *batch_ptr;
+    if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return 
Status::OK();
+
+    int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+    std::vector<uint8_t> selected(bit_vector_bytes);
+    std::vector<uint32_t> hashes(batch.length);
+    std::vector<uint8_t> bv(bit_vector_bytes);
+
+    ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, 
GetStack(thread_index));
+
+    // Start with full selection for the current batch
+    memset(selected.data(), 0xff, bit_vector_bytes);
+    for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_; 
ifilter++) {
+      std::vector<Datum> keys(eval_.received_maps_[ifilter].size());
+      for (size_t i = 0; i < keys.size(); i++) {
+        int input_idx = eval_.received_maps_[ifilter][i];
+        keys[i] = batch[input_idx];
+        if (keys[i].is_scalar()) {
+          ARROW_ASSIGN_OR_RAISE(
+              keys[i],
+              MakeArrayFromScalar(*keys[i].scalar(), batch.length, 
ctx_->memory_pool()));
+        }
+      }
+      ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, 
ExecBatch::Make(std::move(keys)));
+      RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(),
+                                         ctx_->cpu_info()->hardware_flags(), 
stack, 0,
+                                         key_batch.length));
+
+      
eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+                                             key_batch.length, hashes.data(), 
bv.data());
+      arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, 
key_batch.length, 0,
+                                 selected.data());
+    }
+    auto selected_buffer =
+        arrow::internal::make_unique<Buffer>(selected.data(), 
bit_vector_bytes);
+    ArrayData selected_arraydata(boolean(), batch.length,
+                                 {nullptr, std::move(selected_buffer)});
+    Datum selected_datum(selected_arraydata);
+    FilterOptions options;
+    size_t first_nonscalar = batch.values.size();
+    for (size_t i = 0; i < batch.values.size(); i++) {
+      if (!batch.values[i].is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(batch.values[i],
+                              Filter(batch.values[i], selected_datum, options, 
ctx_));
+        first_nonscalar = std::min(first_nonscalar, i);
+        ARROW_DCHECK_EQ(batch.values[i].length(), 
batch.values[first_nonscalar].length());
+      }
+    }
+    // If they're all Scalar, then the length of the batch is the number of 
set bits
+    if (first_nonscalar == batch.values.size())
+      batch.length = arrow::internal::CountSetBits(selected.data(), 0, 
batch.length);
+    else
+      batch.length = batch.values[first_nonscalar].length();
+    return Status::OK();
+  }
+
+ private:
+  Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id);
+
+  Status BuildBloomFilter_on_finished(size_t thread_index) {
+    return build_.on_finished_(thread_index, std::move(build_.batches_));
+  }
+
+  // The Bloom filter is built on the build side of some upstream join. For a 
join to
+  // evaluate the Bloom filter on its input columns, it has to rearrange its 
input columns
+  // to match the column order of the Bloom filter.
+  //
+  // The first part of the pair is the HashJoin to actually perform the 
pushdown into.
+  // The second part is a mapping such that column_map[i] is the index of key 
i in
+  // the first part's input.
+  // If we should disable Bloom filter, returns nullptr and an empty vector, 
and sets
+  // the disable_bloom_filter_ flag.
+  std::pair<HashJoinNode*, std::vector<int>> GetPushdownTarget(HashJoinNode* 
start);
+
+  Result<util::TempVectorStack*> GetStack(size_t thread_index) {
+    if (!tld_[thread_index].is_init) {
+      RETURN_NOT_OK(tld_[thread_index].stack.Init(
+          ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * 
sizeof(uint32_t)));
+      tld_[thread_index].is_init = true;
+    }
+    return &tld_[thread_index].stack;
+  }
+
+  bool disable_bloom_filter_;
+  HashJoinSchema* schema_mgr_;
+  ExecContext* ctx_;
+  TaskScheduler* scheduler_;
+
+  struct ThreadLocalData {
+    bool is_init = false;
+    util::TempVectorStack stack;
+  };
+  std::vector<ThreadLocalData> tld_;
+
+  struct {
+    int task_id_;
+    std::unique_ptr<BloomFilterBuilder> builder_;
+    AccumulationQueue batches_;
+    BuildFinishedCallback on_finished_;
+  } build_;
+
+  struct {
+    std::unique_ptr<BlockedBloomFilter> bloom_filter_;
+    HashJoinNode* pushdown_target_;
+    std::vector<int> column_map_;
+  } push_;
+
+  struct {
+    int task_id_;
+    size_t num_expected_bloom_filters_ = 0;
+    std::mutex receive_mutex_;
+    std::vector<std::unique_ptr<BlockedBloomFilter>> received_filters_;
+    std::vector<std::vector<int>> received_maps_;
+    AccumulationQueue batches_;
+    FiltersReceivedCallback all_received_callback_;
+    FilterFinishedCallback on_finished_;
+  } eval_;
+};
+
 class HashJoinNode : public ExecNode {
  public:
   HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& 
join_options,
@@ -533,6 +718,120 @@ class HashJoinNode : public ExecNode {
 
   const char* kind_name() const override { return "HashJoinNode"; }
 
+  Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+    std::lock_guard<std::mutex> guard(build_side_mutex_);
+    build_accumulator_.InsertBatch(std::move(batch));
+    return Status::OK();
+  }
+
+  Status OnBuildSideFinished(size_t thread_index) {
+    return pushdown_context_.BuildBloomFilter(
+        thread_index, std::move(build_accumulator_),
+        [this](size_t thread_index, AccumulationQueue batches) {
+          return OnBloomFilterFinished(thread_index, std::move(batches));
+        });
+  }
+
+  Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) 
{
+    RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+    return impl_->BuildHashTable(
+        thread_index, std::move(batches),
+        [this](size_t thread_index) { return 
OnHashTableFinished(thread_index); });
+  }
+
+  Status OnHashTableFinished(size_t thread_index) {
+    bool should_probe;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      should_probe = queued_batches_filtered_ && !hash_table_ready_;
+      hash_table_ready_ = true;
+    }
+    if (should_probe) {
+      return ProbeQueuedBatches(thread_index);
+    }
+    return Status::OK();
+  }
+
+  Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      if (!bloom_filters_ready_) {
+        probe_accumulator_.InsertBatch(std::move(batch));
+        return Status::OK();
+      }
+    }
+    RETURN_NOT_OK(pushdown_context_.FilterSingleBatch(thread_index, &batch));
+
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      if (!hash_table_ready_) {
+        probe_accumulator_.InsertBatch(std::move(batch));
+        return Status::OK();
+      }
+    }
+    RETURN_NOT_OK(impl_->ProbeSingleBatch(thread_index, std::move(batch)));
+    return Status::OK();
+  }
+
+  Status OnProbeSideFinished(size_t thread_index) {
+    bool probing_finished;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      probing_finished = queued_batches_probed_ && !probe_side_finished_;
+      probe_side_finished_ = true;
+    }
+    if (probing_finished) return impl_->ProbingFinished(thread_index);
+    return Status::OK();
+  }
+
+  Status OnFiltersReceived() {
+    std::unique_lock<std::mutex> guard(probe_side_mutex_);
+    bloom_filters_ready_ = true;
+    size_t thread_index = thread_indexer_();
+    AccumulationQueue batches = std::move(probe_accumulator_);
+    guard.unlock();
+    return pushdown_context_.FilterBatches(
+        thread_index, std::move(batches),
+        [this](size_t thread_index, AccumulationQueue batches) {
+          return OnQueuedBatchesFiltered(thread_index, std::move(batches));
+        });
+  }
+
+  Status OnQueuedBatchesFiltered(size_t thread_index, AccumulationQueue 
batches) {
+    bool should_probe;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      probe_accumulator_.Concatenate(std::move(batches));
+      should_probe = !queued_batches_filtered_ && hash_table_ready_;
+      queued_batches_filtered_ = true;
+    }
+    if (should_probe) {
+      return ProbeQueuedBatches(thread_index);
+    }
+    return Status::OK();
+  }
+
+  Status ProbeQueuedBatches(size_t thread_index) {
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      queued_batches_to_probe_ = std::move(probe_accumulator_);
+    }
+    return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+                                      queued_batches_to_probe_.batch_count());
+  }
+
+  Status OnQueuedBatchesProbed(size_t thread_index) {
+    queued_batches_to_probe_.Clear();
+    bool probing_finished;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      probing_finished = !queued_batches_probed_ && probe_side_finished_;
+      queued_batches_probed_ = true;
+    }
+    if (probing_finished) return impl_->ProbingFinished(thread_index);
+    return Status::OK();
+  }
+
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != 
inputs_.end());
     if (complete_.load()) {
@@ -547,16 +846,19 @@ class HashJoinNode : public ExecNode {
     START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
                                    {{"batch.length", batch.length}});
 
-    {
-      Status status = impl_->InputReceived(thread_index, side, 
std::move(batch));
-      if (!status.ok()) {
-        StopProducing();
-        ErrorIfNotOk(status);
-        return;
-      }
+    Status status = side == 0 ? OnProbeSideBatch(thread_index, 
std::move(batch))
+                              : OnBuildSideBatch(thread_index, 
std::move(batch));
+
+    if (!status.ok()) {
+      StopProducing();
+      ErrorIfNotOk(status);
+      return;
     }
+
     if (batch_count_[side].Increment()) {
-      Status status = impl_->InputFinished(thread_index, side);
+      status = side == 0 ? OnProbeSideFinished(thread_index)
+                         : OnBuildSideFinished(thread_index);
+
       if (!status.ok()) {
         StopProducing();
         ErrorIfNotOk(status);
@@ -581,7 +883,9 @@ class HashJoinNode : public ExecNode {
     EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", 
total_batches}});
 
     if (batch_count_[side].SetTotal(total_batches)) {
-      Status status = impl_->InputFinished(thread_index, side);
+      Status status = side == 0 ? OnProbeSideFinished(thread_index)
+                                : OnBuildSideFinished(thread_index);
+
       if (!status.ok()) {
         StopProducing();
         ErrorIfNotOk(status);
@@ -590,131 +894,42 @@ class HashJoinNode : public ExecNode {
     }
   }
 
-  // The Bloom filter is built on the build side of some upstream join. For a 
join to
-  // evaluate the Bloom filter on its input columns, it has to rearrange its 
input columns
-  // to match the column order of the Bloom filter.
-  //
-  // The first part of the pair is the HashJoin to actually perform the 
pushdown into.
-  // The second part is a mapping such that column_map[i] is the index of key 
i in
-  // the first part's input.
-  // If we should disable Bloom filter, returns nullptr and an empty vector, 
and sets
-  // the disable_bloom_filter_ flag.
-  std::pair<HashJoinImpl*, std::vector<int>> GetPushdownTarget() {
-#if !ARROW_LITTLE_ENDIAN
-    // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It 
probably just
-    // needs a few byte swaps in the proper spots.
-    disable_bloom_filter_ = true;
-    return {nullptr, {}};
-#else
-    // A build-side Bloom filter tells us if a row is definitely not in the 
build side.
-    // This allows us to early-eliminate rows or early-accept rows depending 
on the type
-    // of join. Left Outer Join and Full Outer Join output all rows, so a 
build-side Bloom
-    // filter would only allow us to early-output. Left Antijoin outputs only 
if there is
-    // no match, so again early output. We don't implement early output for 
now, so we
-    // must disallow these types of joins.
-    bool bloom_filter_does_not_apply_to_join = join_type_ == 
JoinType::LEFT_ANTI ||
-                                               join_type_ == 
JoinType::LEFT_OUTER ||
-                                               join_type_ == 
JoinType::FULL_OUTER;
-    disable_bloom_filter_ = disable_bloom_filter_ || 
bloom_filter_does_not_apply_to_join;
-
-    for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
-      SchemaProjectionMap keys_to_input = schema_mgr_->proj_maps[side].map(
-          HashJoinProjection::KEY, HashJoinProjection::INPUT);
-      // Bloom filter currently doesn't support dictionaries.
-      for (int i = 0; i < keys_to_input.num_cols; i++) {
-        int idx = keys_to_input.get(i);
-        bool is_dict =
-            inputs_[side]->output_schema()->field(idx)->type()->id() == 
Type::DICTIONARY;
-        if (is_dict) {
-          disable_bloom_filter_ = true;
-          break;
-        }
-      }
-    }
-
-    bool all_comparisons_is = true;
-    for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp == 
JoinKeyCmp::IS);
-
-    if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ == 
JoinType::FULL_OUTER) &&
-        all_comparisons_is)
-      disable_bloom_filter_ = true;
-
-    if (disable_bloom_filter_) return {nullptr, {}};
-
-    // We currently only push Bloom filters on the probe side, and only if 
that input is
-    // also a join.
-    SchemaProjectionMap probe_key_to_input =
-        schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, 
HashJoinProjection::INPUT);
-    int num_keys = probe_key_to_input.num_cols;
-
-    // A mapping such that bloom_to_target[i] is the index of key i in the 
pushdown
-    // target's input
-    std::vector<int> bloom_to_target(num_keys);
-    HashJoinNode* pushdown_target = this;
-    for (int i = 0; i < num_keys; i++) bloom_to_target[i] = 
probe_key_to_input.get(i);
-
-    for (ExecNode* candidate = inputs()[0]; candidate->kind_name() == 
this->kind_name();
-         candidate = candidate->inputs()[0]) {
-      auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
-      SchemaProjectionMap candidate_output_to_input =
-          
candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
-                                                           
HashJoinProjection::INPUT);
-
-      // Check if any of the keys are missing, if they are, break
-      bool break_outer = false;
-      for (int i = 0; i < num_keys; i++) {
-        // Since all of the probe side columns are before the build side 
columns,
-        // if the index of an output is greater than the number of probe-side 
input
-        // columns, it must have come from the candidate's build side.
-        if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
-          break_outer = true;
-          break;
-        }
-        int candidate_input_idx = 
candidate_output_to_input.get(bloom_to_target[i]);
-        // The output column has to have come from somewhere...
-        ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField());
-      }
-      if (break_outer) break;
-
-      // The Bloom filter will filter out nulls, which may cause a Right/Full 
Outer Join
-      // to incorrectly output some rows with nulls padding the probe-side 
rows. This may
-      // cause a row with all null keys to be emitted. This is normally not an 
issue
-      // with EQ, but if all comparisons are IS (i.e. all-null is accepted), 
this could
-      // produce incorrect rows.
-      bool can_produce_build_side_nulls =
-          candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
-          candidate_as_join->join_type_ == JoinType::FULL_OUTER;
-
-      if (all_comparisons_is || can_produce_build_side_nulls) break;
-
-      // All keys are present, we can update the mapping
-      for (int i = 0; i < num_keys; i++) {
-        int candidate_input_idx = 
candidate_output_to_input.get(bloom_to_target[i]);
-        bloom_to_target[i] = candidate_input_idx;
-      }
-      pushdown_target = candidate_as_join;
-    }
-    return std::make_pair(pushdown_target->impl_.get(), 
std::move(bloom_to_target));
-#endif  // ARROW_LITTLE_ENDIAN
-  }
-
   Status PrepareToProduce() override {
     bool use_sync_execution = !(plan_->exec_context()->executor());
-    size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity();
+    // TODO(ARROW-15732)
+    // Each side of join might have an IO thread being called from. Once this 
is fixed
+    // we will change it back to just the CPU's thread pool capacity.
+    size_t num_threads = (GetCpuThreadPoolCapacity() + 
io::GetIOThreadPoolCapacity() + 1);
+
+    scheduler_ = TaskScheduler::Make();
+    pushdown_context_.Init(
+        this, num_threads, scheduler_.get(), [this]() { return 
OnFiltersReceived(); },
+        disable_bloom_filter_, use_sync_execution);
+
+    RETURN_NOT_OK(impl_->Init(
+        plan_->exec_context(), join_type_, num_threads, schema_mgr_.get(), 
key_cmp_,
+        filter_, [this](ExecBatch batch) { this->OutputBatchCallback(batch); },
+        [this](int64_t total_num_batches) { 
this->FinishedCallback(total_num_batches); },
+        scheduler_.get()));
 
-    HashJoinImpl* pushdown_target = nullptr;
-    std::vector<int> column_map;
-    std::tie(pushdown_target, column_map) = GetPushdownTarget();
+    task_group_probe_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) -> Status {
+          return impl_->ProbeSingleBatch(thread_index,
+                                         
std::move(queued_batches_to_probe_[task_id]));
+        },
+        [this](size_t thread_index) -> Status {
+          return OnQueuedBatchesProbed(thread_index);
+        });
 
-    return impl_->Init(
-        plan_->exec_context(), join_type_, use_sync_execution, num_threads,
-        schema_mgr_.get(), key_cmp_, filter_,
-        [this](ExecBatch batch) { this->OutputBatchCallback(batch); },
-        [this](int64_t total_num_batches) { 
this->FinishedCallback(total_num_batches); },
+    scheduler_->RegisterEnd();
+
+    RETURN_NOT_OK(scheduler_->StartScheduling(
+        0 /*thread index*/,
         [this](std::function<Status(size_t)> func) -> Status {
           return this->ScheduleTaskCallback(std::move(func));
         },
-        pushdown_target, std::move(column_map));
+        static_cast<int>(2 * num_threads) /*concurrent tasks*/, 
use_sync_execution));
+    return Status::OK();
   }
 
   Status StartProducing() override {
@@ -723,7 +938,7 @@ class HashJoinNode : public ExecNode {
                         {"node.detail", ToString()},
                         {"node.kind", kind_name()}});
     END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
-
+    RETURN_NOT_OK(pushdown_context_.StartProducing());
     return Status::OK();
   }
 
@@ -797,9 +1012,229 @@ class HashJoinNode : public ExecNode {
   std::unique_ptr<HashJoinSchema> schema_mgr_;
   std::unique_ptr<HashJoinImpl> impl_;
   util::AsyncTaskGroup task_group_;
+  std::unique_ptr<TaskScheduler> scheduler_;
+  util::AccumulationQueue build_accumulator_;
+  util::AccumulationQueue probe_accumulator_;
+  util::AccumulationQueue queued_batches_to_probe_;
+
+  std::mutex build_side_mutex_;
+  std::mutex probe_side_mutex_;
+
+  int task_group_probe_;
+  bool bloom_filters_ready_ = false;
+  bool hash_table_ready_ = false;
+  bool queued_batches_filtered_ = false;
+  bool queued_batches_probed_ = false;
+  bool probe_side_finished_ = false;
+
+  friend struct BloomFilterPushdownContext;
   bool disable_bloom_filter_;
+  BloomFilterPushdownContext pushdown_context_;
 };
 
+void BloomFilterPushdownContext::Init(HashJoinNode* owner, size_t num_threads,
+                                      TaskScheduler* scheduler,
+                                      FiltersReceivedCallback 
on_bloom_filters_received,
+                                      bool disable_bloom_filter,
+                                      bool use_sync_execution) {
+  schema_mgr_ = owner->schema_mgr_.get();
+  ctx_ = owner->plan_->exec_context();
+  scheduler_ = scheduler;
+  tld_.resize(num_threads);
+  disable_bloom_filter_ = disable_bloom_filter;
+  std::tie(push_.pushdown_target_, push_.column_map_) = 
GetPushdownTarget(owner);
+  eval_.all_received_callback_ = std::move(on_bloom_filters_received);
+  if (!disable_bloom_filter_) {
+    ARROW_CHECK(push_.pushdown_target_);
+    push_.bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+    push_.pushdown_target_->pushdown_context_.ExpectBloomFilter();
+
+    build_.builder_ = BloomFilterBuilder::Make(
+        use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+                           : BloomFilterBuildStrategy::PARALLEL);
+
+    build_.task_id_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) {
+          return BuildBloomFilter_exec_task(thread_index, task_id);
+        },
+        [this](size_t thread_index) {
+          return BuildBloomFilter_on_finished(thread_index);
+        });
+  }
+
+  eval_.task_id_ = scheduler_->RegisterTaskGroup(
+      [this](size_t thread_index, int64_t task_id) {
+        return FilterSingleBatch(thread_index, &eval_.batches_[task_id]);
+      },
+      [this](size_t thread_index) {
+        return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+      });
+}
+
+Status BloomFilterPushdownContext::StartProducing() {
+  if (eval_.num_expected_bloom_filters_ == 0) return 
eval_.all_received_callback_();
+  return Status::OK();
+}
+
+Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index,
+                                                    AccumulationQueue batches,
+                                                    BuildFinishedCallback 
on_finished) {
+  build_.batches_ = std::move(batches);
+  build_.on_finished_ = std::move(on_finished);
+
+  if (disable_bloom_filter_)
+    return build_.on_finished_(thread_index, std::move(build_.batches_));
+
+  RETURN_NOT_OK(build_.builder_->Begin(
+      /*num_threads=*/tld_.size(), ctx_->cpu_info()->hardware_flags(),
+      ctx_->memory_pool(), build_.batches_.row_count(), 
build_.batches_.batch_count(),
+      push_.bloom_filter_.get()));
+
+  return scheduler_->StartTaskGroup(thread_index, build_.task_id_,
+                                    
/*num_tasks=*/build_.batches_.batch_count());
+}
+
+Status BloomFilterPushdownContext::PushBloomFilter() {
+  if (!disable_bloom_filter_)
+    return push_.pushdown_target_->pushdown_context_.ReceiveBloomFilter(
+        std::move(push_.bloom_filter_), std::move(push_.column_map_));
+  return Status::OK();
+}
+
+Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t 
thread_index,
+                                                              int64_t task_id) 
{
+  const ExecBatch& input_batch = build_.batches_[task_id];
+  SchemaProjectionMap key_to_in =
+      schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, 
HashJoinProjection::INPUT);
+  std::vector<Datum> key_columns(key_to_in.num_cols);
+  for (size_t i = 0; i < key_columns.size(); i++) {
+    int input_idx = key_to_in.get(static_cast<int>(i));
+    key_columns[i] = input_batch[input_idx];
+    if (key_columns[i].is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(key_columns[i],
+                            MakeArrayFromScalar(*key_columns[i].scalar(),
+                                                input_batch.length, 
ctx_->memory_pool()));
+    }
+  }
+  ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, 
ExecBatch::Make(std::move(key_columns)));
+
+  ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index));
+  util::TempVectorHolder<uint32_t> hash_holder(stack, 
util::MiniBatch::kMiniBatchLength);
+  uint32_t* hashes = hash_holder.mutable_data();
+  for (int64_t i = 0; i < key_batch.length; i += 
util::MiniBatch::kMiniBatchLength) {
+    int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
+                              
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
+    RETURN_NOT_OK(Hashing32::HashBatch(
+        key_batch, hashes, ctx_->cpu_info()->hardware_flags(), stack, i, 
length));
+    RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, length, 
hashes));
+  }
+  return Status::OK();
+}
+
+std::pair<HashJoinNode*, std::vector<int>> 
BloomFilterPushdownContext::GetPushdownTarget(
+    HashJoinNode* start) {
+#if !ARROW_LITTLE_ENDIAN
+  // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It 
probably just
+  // needs a few byte swaps in the proper spots.
+  disable_bloom_filter_ = true;
+  return {nullptr, {}};
+#else
+  if (disable_bloom_filter_) return {nullptr, {}};
+  JoinType join_type = start->join_type_;
+
+  // A build-side Bloom filter tells us if a row is definitely not in the 
build side.
+  // This allows us to early-eliminate rows or early-accept rows depending on 
the type
+  // of join. Left Outer Join and Full Outer Join output all rows, so a 
build-side Bloom
+  // filter would only allow us to early-output. Left Antijoin outputs only if 
there is
+  // no match, so again early output. We don't implement early output for now, 
so we
+  // must disallow these types of joins.
+  bool bloom_filter_does_not_apply_to_join = join_type == JoinType::LEFT_ANTI 
||
+                                             join_type == JoinType::LEFT_OUTER 
||
+                                             join_type == JoinType::FULL_OUTER;
+  disable_bloom_filter_ = disable_bloom_filter_ || 
bloom_filter_does_not_apply_to_join;
+
+  // Bloom filter currently doesn't support dictionaries.
+  for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
+    SchemaProjectionMap keys_to_input = 
start->schema_mgr_->proj_maps[side].map(
+        HashJoinProjection::KEY, HashJoinProjection::INPUT);
+    // Bloom filter currently doesn't support dictionaries.
+    for (int i = 0; i < keys_to_input.num_cols; i++) {
+      int idx = keys_to_input.get(i);
+      bool is_dict = 
start->inputs_[side]->output_schema()->field(idx)->type()->id() ==
+                     Type::DICTIONARY;
+      if (is_dict) {
+        disable_bloom_filter_ = true;
+        break;
+      }
+    }
+  }
+
+  bool all_comparisons_is = true;
+  for (JoinKeyCmp cmp : start->key_cmp_) all_comparisons_is &= (cmp == 
JoinKeyCmp::IS);
+
+  if ((join_type == JoinType::RIGHT_OUTER || join_type == 
JoinType::FULL_OUTER) &&
+      all_comparisons_is)
+    disable_bloom_filter_ = true;
+
+  if (disable_bloom_filter_) return {nullptr, {}};
+
+  // We currently only push Bloom filters on the probe side, and only if that 
input is
+  // also a join.
+  SchemaProjectionMap probe_key_to_input = 
start->schema_mgr_->proj_maps[0].map(
+      HashJoinProjection::KEY, HashJoinProjection::INPUT);
+  int num_keys = probe_key_to_input.num_cols;
+
+  // A mapping such that bloom_to_target[i] is the index of key i in the 
pushdown
+  // target's input
+  std::vector<int> bloom_to_target(num_keys);
+  HashJoinNode* pushdown_target = start;
+  for (int i = 0; i < num_keys; i++) bloom_to_target[i] = 
probe_key_to_input.get(i);
+
+  for (ExecNode* candidate = start->inputs()[0];
+       candidate->kind_name() == start->kind_name(); candidate = 
candidate->inputs()[0]) {
+    auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
+    SchemaProjectionMap candidate_output_to_input =
+        
candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
+                                                         
HashJoinProjection::INPUT);
+
+    // Check if any of the keys are missing, if they are, break
+    bool break_outer = false;
+    for (int i = 0; i < num_keys; i++) {
+      // Since all of the probe side columns are before the build side columns,
+      // if the index of an output is greater than the number of probe-side 
input
+      // columns, it must have come from the candidate's build side.
+      if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
+        break_outer = true;
+        break;
+      }
+      int candidate_input_idx = 
candidate_output_to_input.get(bloom_to_target[i]);
+      // The output column has to have come from somewhere...
+      ARROW_DCHECK_NE(candidate_input_idx, 
start->schema_mgr_->kMissingField());
+    }
+    if (break_outer) break;
+
+    // The Bloom filter will filter out nulls, which may cause a Right/Full 
Outer Join
+    // to incorrectly output some rows with nulls padding the probe-side rows. 
This may
+    // cause a row with all null keys to be emitted. This is normally not an 
issue
+    // with EQ, but if all comparisons are IS (i.e. all-null is accepted), 
this could
+    // produce incorrect rows.
+    bool can_produce_build_side_nulls =
+        candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
+        candidate_as_join->join_type_ == JoinType::FULL_OUTER;
+
+    if (all_comparisons_is || can_produce_build_side_nulls) break;
+
+    // All keys are present, we can update the mapping
+    for (int i = 0; i < num_keys; i++) {
+      int candidate_input_idx = 
candidate_output_to_input.get(bloom_to_target[i]);
+      bloom_to_target[i] = candidate_input_idx;
+    }
+    pushdown_target = candidate_as_join;
+  }
+  return std::make_pair(pushdown_target, std::move(bloom_to_target));
+#endif  // ARROW_LITTLE_ENDIAN
+}
+
 namespace internal {
 void RegisterHashJoinNode(ExecFactoryRegistry* registry) {
   DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make));

Reply via email to