westonpace commented on code in PR #13669:
URL: https://github.com/apache/arrow/pull/13669#discussion_r1064813860


##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -39,20 +37,180 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& 
that) {
   this->batches_.reserve(this->batches_.size() + that.batches_.size());
   std::move(that.batches_.begin(), that.batches_.end(),
             std::back_inserter(this->batches_));
-  this->row_count_ += that.row_count_;
   that.Clear();
 }
 
 void AccumulationQueue::InsertBatch(ExecBatch batch) {
-  row_count_ += batch.length;
   batches_.emplace_back(std::move(batch));
 }
 
+void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
+  ARROW_DCHECK(idx < batches_.size());
+  batches_[idx] = std::move(batch);
+}
+
+size_t AccumulationQueue::CalculateRowCount() const {
+  size_t count = 0;
+  for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
+  return count;
+}
+
 void AccumulationQueue::Clear() {
-  row_count_ = 0;
   batches_.clear();
 }
 
-ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
-}  // namespace util
+Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
+  ctx_ = ctx;
+  partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
+  for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
+    task_group_read_[ipart] = ctx_->RegisterTaskGroup(
+        [this, ipart](size_t thread_index, int64_t batch_index) {
+          return read_back_fn_[ipart](thread_index, 
static_cast<size_t>(batch_index),
+                                      std::move(queues_[ipart][batch_index]));
+        },
+        [this, ipart](size_t thread_index) { return 
on_finished_[ipart](thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch 
batch) {
+  Datum& hash_datum = batch.values.back();
+  const uint64_t* hashes =
+      reinterpret_cast<const 
uint64_t*>(hash_datum.array()->buffers[1]->data());
+  // `permutation` stores the indices of rows in the input batch sorted by 
partition.
+  std::vector<uint16_t> permutation(batch.length);
+  uint16_t part_starts[kNumPartitions + 1];
+  PartitionSort::Eval(
+      batch.length, kNumPartitions, part_starts,
+      /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
+      /*output_fn=*/
+      [&permutation](int64_t input_pos, int64_t output_pos) {
+        permutation[output_pos] = static_cast<uint16_t>(input_pos);
+      });
+
+  int unprocessed_partition_ids[kNumPartitions];
+  RETURN_NOT_OK(partition_locks_.ForEachPartition(
+      thread_index, unprocessed_partition_ids,
+      /*is_prtn_empty=*/
+      [&](int part_id) { return part_starts[part_id + 1] == 
part_starts[part_id]; },
+      /*partition=*/

Review Comment:
   ```suggestion
         /*process_prtn_fn=*/
   ```



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; };
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; };
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {

Review Comment:
   ```suggestion
   /// Accumulates batches in a queue that can be spilled to disk if needed
   ///
   /// Each batch is partitioned by the lower bits of the hash column (which 
must be present) and
   /// rows are initially accumulated in batch builders (one per partition).  
As a batch builder fills
   /// up the completed batch is put into an in-memory accumulation queue (per 
partition).
   ///
   /// When memory pressure is encountered the spilling queue's "spill cursor" 
can be advanced.  This
   /// will cause a partition to be spilled to disk.  Any future data arriving 
for that partition will go immediately
   /// to disk (after accumulating a full batch in the batch builder).
   ///
   /// Later, data is retrieved one partition at a time.  Partitions that are 
in-memory will be delivered immediately
   /// in new thread tasks.  Partitions that are on disk will be read from disk 
and delivered as they arrive.
   class SpillingAccumulationQueue {
   ```
   
   Other things that might be nice to mention:
   
    * This is a "write then read" API (as opposed to "write then read then 
write some more then read some more"
    * Add some details about the fact that hashes are spilled separately (and 
why)



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really
+    // need to use the bottom bits of the hash for spilling to avoid
+    // a huge number of hash collisions per partition.
+    return static_cast<size_t>(hash & (kNumPartitions - 1));
+  }
+
+  size_t CalculatePartitionRowCount(size_t partition) const;

Review Comment:
   Would be nice to have a comment here explaining that `partition` must not 
have been spilled yet, or otherwise indicating this is "rows in memory"



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -39,20 +37,180 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& 
that) {
   this->batches_.reserve(this->batches_.size() + that.batches_.size());
   std::move(that.batches_.begin(), that.batches_.end(),
             std::back_inserter(this->batches_));
-  this->row_count_ += that.row_count_;
   that.Clear();
 }
 
 void AccumulationQueue::InsertBatch(ExecBatch batch) {
-  row_count_ += batch.length;
   batches_.emplace_back(std::move(batch));
 }
 
+void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
+  ARROW_DCHECK(idx < batches_.size());
+  batches_[idx] = std::move(batch);
+}
+
+size_t AccumulationQueue::CalculateRowCount() const {
+  size_t count = 0;
+  for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
+  return count;
+}
+
 void AccumulationQueue::Clear() {
-  row_count_ = 0;
   batches_.clear();
 }
 
-ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
-}  // namespace util
+Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
+  ctx_ = ctx;
+  partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
+  for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
+    task_group_read_[ipart] = ctx_->RegisterTaskGroup(
+        [this, ipart](size_t thread_index, int64_t batch_index) {
+          return read_back_fn_[ipart](thread_index, 
static_cast<size_t>(batch_index),
+                                      std::move(queues_[ipart][batch_index]));
+        },
+        [this, ipart](size_t thread_index) { return 
on_finished_[ipart](thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch 
batch) {
+  Datum& hash_datum = batch.values.back();
+  const uint64_t* hashes =
+      reinterpret_cast<const 
uint64_t*>(hash_datum.array()->buffers[1]->data());
+  // `permutation` stores the indices of rows in the input batch sorted by 
partition.
+  std::vector<uint16_t> permutation(batch.length);
+  uint16_t part_starts[kNumPartitions + 1];
+  PartitionSort::Eval(
+      batch.length, kNumPartitions, part_starts,
+      /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
+      /*output_fn=*/
+      [&permutation](int64_t input_pos, int64_t output_pos) {
+        permutation[output_pos] = static_cast<uint16_t>(input_pos);
+      });
+
+  int unprocessed_partition_ids[kNumPartitions];
+  RETURN_NOT_OK(partition_locks_.ForEachPartition(
+      thread_index, unprocessed_partition_ids,
+      /*is_prtn_empty=*/
+      [&](int part_id) { return part_starts[part_id + 1] == 
part_starts[part_id]; },
+      /*partition=*/
+      [&](int locked_part_id_int) {
+        size_t locked_part_id = static_cast<size_t>(locked_part_id_int);
+        uint64_t num_total_rows_to_append =
+            part_starts[locked_part_id + 1] - part_starts[locked_part_id];
+
+        size_t offset = static_cast<size_t>(part_starts[locked_part_id]);
+        while (num_total_rows_to_append > 0) {
+          int num_rows_to_append =
+              std::min(static_cast<int>(num_total_rows_to_append),
+                       static_cast<int>(ExecBatchBuilder::num_rows_max() -
+                                        builders_[locked_part_id].num_rows()));
+
+          RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
+              ctx_->memory_pool(), batch, num_rows_to_append, 
permutation.data() + offset,
+              batch.num_values()));
+
+          if (builders_[locked_part_id].is_full()) {
+            ExecBatch batch = builders_[locked_part_id].Flush();
+            Datum hash = std::move(batch.values.back());
+            batch.values.pop_back();
+            ExecBatch hash_batch({std::move(hash)}, batch.length);
+            if (locked_part_id < spilling_cursor_)
+              RETURN_NOT_OK(files_[locked_part_id].SpillBatch(ctx_, 
std::move(batch)));
+            else
+              queues_[locked_part_id].InsertBatch(std::move(batch));
+
+            if (locked_part_id >= hash_cursor_)
+              hash_queues_[locked_part_id].InsertBatch(std::move(hash_batch));
+          }
+          offset += num_rows_to_append;
+          num_total_rows_to_append -= num_rows_to_append;
+        }
+        return Status::OK();
+      }));
+  return Status::OK();
+}
+
+const uint64_t* SpillingAccumulationQueue::GetHashes(size_t partition, size_t 
batch_idx) {
+  ARROW_DCHECK(partition >= hash_cursor_.load());
+  if (batch_idx > hash_queues_[partition].batch_count()) {
+    const Datum& datum = hash_queues_[partition][batch_idx].values[0];
+    return reinterpret_cast<const 
uint64_t*>(datum.array()->buffers[1]->data());
+  } else {
+    size_t hash_idx = builders_[partition].num_cols();
+    KeyColumnArray kca = builders_[partition].column(hash_idx - 1);
+    return reinterpret_cast<const uint64_t*>(kca.data(1));
+  }
+}
+
+Status SpillingAccumulationQueue::GetPartition(
+    size_t thread_index, size_t partition,
+    std::function<Status(size_t, size_t, ExecBatch)> on_batch,
+    std::function<Status(size_t)> on_finished) {
+  bool is_in_memory = partition >= spilling_cursor_.load();
+  if (builders_[partition].num_rows() > 0) {

Review Comment:
   This check makes sense if our workflow is "accumulate until input exhausted" 
-> "consume", which I think is a safe assumption, but we should jot it down 
somewhere.  In other words, first we accumulate, then we start retrieving data. 
 And we don't accumulate more data after we've started retrieving data.



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,

Review Comment:
   This method should be documented.  It would be good to point out that each 
batch of data will be given a new thread task and `on_batch` will be called on 
that thread task.



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really

Review Comment:
   ```suggestion
       // Hash Table uses the top bits of the hash, so we really
   ```



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -377,12 +383,17 @@ class ARROW_EXPORT ExecBatchBuilder {
   ExecBatch Flush();
 
   int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
+  bool is_full() const { return num_rows() == num_rows_max(); }
 
   static int num_rows_max() { return 1 << kLogNumRows; }
 
  private:
   static constexpr int kLogNumRows = 15;
 
+  // Align all buffers to 512 bytes so that we can spill them with
+  // DirectIO.
+  static constexpr int64_t kAlignment = 512;

Review Comment:
   Can we add a comment to `ExecBatchBuilder` somewhere to make it clear to 
users that it will build batches with 512 byte alignment?



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {
+  std::unique_ptr<BlockedBloomFilter> in_memory;
+  std::unique_ptr<BlockedBloomFilter>
+      partitions[SpillingAccumulationQueue::kNumPartitions];
+
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* bv);
+};
+
+class SpillingHashJoin {
+ public:
+  using RegisterTaskGroupCallback = std::function<int(
+      std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
+  using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
+  using AddProbeSideHashColumn = std::function<Status(size_t, ExecBatch*)>;
+  using BloomFilterFinishedCallback = std::function<Status(size_t)>;
+  using ApplyBloomFilterCallback = std::function<Status(size_t, ExecBatch*)>;
+  using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
+  using FinishedCallback = std::function<Status(int64_t)>;
+  using StartSpillingCallback = std::function<Status(size_t)>;
+  using PauseProbeSideCallback = std::function<void(int)>;
+  using ResumeProbeSideCallback = std::function<void(int)>;
+
+  struct CallbackRecord {
+    RegisterTaskGroupCallback register_task_group;
+    StartTaskGroupCallback start_task_group;
+    AddProbeSideHashColumn add_probe_side_hashes;

Review Comment:
   ```suggestion
       AddProbeSideHashColumnCallback add_probe_side_hashes;
   ```



##########
cpp/src/arrow/compute/exec/spilling_join.cc:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "arrow/compute/exec/spilling_join.h"
+#include "arrow/util/atomic_util.h"
+
+namespace arrow {
+namespace compute {
+void PartitionedBloomFilter::Find(int64_t hardware_flags, int64_t num_rows,
+                                  const uint64_t* hashes, uint8_t* bv) {
+  if (in_memory) return in_memory->Find(hardware_flags, num_rows, hashes, bv);
+
+  for (int64_t i = 0; i < num_rows; i++) {
+    uint64_t hash = hashes[i];
+    size_t partition = SpillingAccumulationQueue::partition_id(hashes[i]);
+    bool found = partitions[partition] ? partitions[partition]->Find(hash) : 
true;
+    bit_util::SetBitTo(bv, i, found);
+  }
+}
+
+Status SpillingHashJoin::Init(QueryContext* ctx, JoinType join_type, size_t 
num_threads,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_left,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_right,
+                              std::vector<JoinKeyCmp>* key_cmp, Expression* 
filter,
+                              PartitionedBloomFilter* bloom_filter,
+                              CallbackRecord callback_record, bool is_swiss) {
+  ctx_ = ctx;
+  num_threads_ = num_threads;
+  callbacks_ = std::move(callback_record);
+  bloom_filter_ = bloom_filter;
+  is_swiss_ = is_swiss;
+
+  HashJoinImpl::CallbackRecord join_callbacks;
+  join_callbacks.register_task_group = callbacks_.register_task_group;
+  join_callbacks.start_task_group = callbacks_.start_task_group;
+  join_callbacks.output_batch = callbacks_.output_batch;
+  join_callbacks.finished = [this](int64_t num_total_batches) {
+    return this->OnCollocatedJoinFinished(num_total_batches);
+  };
+
+  builder_ = BloomFilterBuilder::Make(num_threads_ == 1
+                                          ? 
BloomFilterBuildStrategy::SINGLE_THREADED
+                                          : 
BloomFilterBuildStrategy::PARALLEL);
+  RETURN_NOT_OK(build_accumulator_.Init(ctx));
+  RETURN_NOT_OK(probe_accumulator_.Init(ctx));
+
+  for (size_t i = 0; i < SpillingAccumulationQueue::kNumPartitions; i++) {
+    ARROW_ASSIGN_OR_RAISE(
+        impls_[i], is_swiss_ ? HashJoinImpl::MakeSwiss() : 
HashJoinImpl::MakeBasic());
+    RETURN_NOT_OK(impls_[i]->Init(ctx_, join_type, num_threads, proj_map_left,
+                                  proj_map_right, key_cmp, filter, 
join_callbacks));
+
+    task_group_bloom_[i] = callbacks_.register_task_group(
+        [this](size_t thread_index, int64_t task_id) {
+          return PushBloomFilterBatch(thread_index, task_id);
+        },
+        [this](size_t thread_index) { return 
OnBloomFilterFinished(thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::CheckSpilling(size_t thread_index, ExecBatch& batch) {
+  size_t size_of_batch = static_cast<size_t>(batch.TotalBufferSize());
+  size_t max_batch_size = arrow::util::AtomicMax(max_batch_size_, 
size_of_batch);
+
+  // Spilling algorithm proven to not use more than
+  // (SpillThreshold + NumThreads * BatchSize) memory.
+  // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k 
* MaxMemory
+  // with some fuzz factor k (which is 0.8 here because that's what I decided).
+  // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize.
+  constexpr float kFuzzFactor = 0.8f;
+  size_t max_memory = static_cast<size_t>(kFuzzFactor * 
ctx_->options().max_memory_bytes);
+  size_t spill_threshold = static_cast<size_t>(std::max(
+      static_cast<ssize_t>(kFuzzFactor * max_memory - num_threads_ * 
max_batch_size),
+      static_cast<ssize_t>(0)));
+  size_t bytes_allocated = 
static_cast<size_t>(ctx_->memory_pool()->bytes_allocated());
+  size_t bytes_inflight = ctx_->GetCurrentTempFileIO();
+
+  size_t backpressure_threshold = spill_threshold / 2;
+  if (bytes_allocated > backpressure_threshold) {
+    if (int32_t expected = 0; 
backpressure_counter_.compare_exchange_strong(expected, 1))
+      callbacks_.pause_probe_side(1);
+  }
+  if ((bytes_allocated - bytes_inflight) > spill_threshold) {
+    RETURN_NOT_OK(AdvanceSpillCursor(thread_index));
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::AdvanceSpillCursor(size_t thread_index) {
+  if (bool expected = false;
+      !spilling_.load() && spilling_.compare_exchange_strong(expected, true))
+    return callbacks_.start_spilling(thread_index);
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_advanced, 
probe_accumulator_.AdvanceSpillCursor());
+  if (probe_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_advanced, 
build_accumulator_.AdvanceSpillCursor());
+  if (build_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_hash_advanced, 
probe_accumulator_.AdvanceHashCursor());
+  if (probe_hash_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_hash_advanced, 
build_accumulator_.AdvanceHashCursor());
+  if (build_hash_advanced) return Status::OK();
+
+  // Pray we don't run out of memory
+  return Status::OK();

Review Comment:
   ```suggestion
     ARROW_LOG(WARNING) << "Memory limits for query exceeded but all data from 
hash join spilled to disk";
     return Status::OK();
   ```



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {
+  std::unique_ptr<BlockedBloomFilter> in_memory;
+  std::unique_ptr<BlockedBloomFilter>
+      partitions[SpillingAccumulationQueue::kNumPartitions];
+
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* bv);
+};
+
+class SpillingHashJoin {
+ public:
+  using RegisterTaskGroupCallback = std::function<int(
+      std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
+  using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
+  using AddProbeSideHashColumn = std::function<Status(size_t, ExecBatch*)>;
+  using BloomFilterFinishedCallback = std::function<Status(size_t)>;
+  using ApplyBloomFilterCallback = std::function<Status(size_t, ExecBatch*)>;
+  using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
+  using FinishedCallback = std::function<Status(int64_t)>;
+  using StartSpillingCallback = std::function<Status(size_t)>;
+  using PauseProbeSideCallback = std::function<void(int)>;
+  using ResumeProbeSideCallback = std::function<void(int)>;
+
+  struct CallbackRecord {

Review Comment:
   Can we just capitulate and use pure virtual classes at this point?



##########
cpp/src/arrow/compute/exec/spilling_join.cc:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "arrow/compute/exec/spilling_join.h"
+#include "arrow/util/atomic_util.h"
+
+namespace arrow {
+namespace compute {
+void PartitionedBloomFilter::Find(int64_t hardware_flags, int64_t num_rows,
+                                  const uint64_t* hashes, uint8_t* bv) {
+  if (in_memory) return in_memory->Find(hardware_flags, num_rows, hashes, bv);
+
+  for (int64_t i = 0; i < num_rows; i++) {
+    uint64_t hash = hashes[i];
+    size_t partition = SpillingAccumulationQueue::partition_id(hashes[i]);
+    bool found = partitions[partition] ? partitions[partition]->Find(hash) : 
true;
+    bit_util::SetBitTo(bv, i, found);
+  }
+}
+
+Status SpillingHashJoin::Init(QueryContext* ctx, JoinType join_type, size_t 
num_threads,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_left,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_right,
+                              std::vector<JoinKeyCmp>* key_cmp, Expression* 
filter,
+                              PartitionedBloomFilter* bloom_filter,
+                              CallbackRecord callback_record, bool is_swiss) {
+  ctx_ = ctx;
+  num_threads_ = num_threads;
+  callbacks_ = std::move(callback_record);
+  bloom_filter_ = bloom_filter;
+  is_swiss_ = is_swiss;
+
+  HashJoinImpl::CallbackRecord join_callbacks;
+  join_callbacks.register_task_group = callbacks_.register_task_group;
+  join_callbacks.start_task_group = callbacks_.start_task_group;
+  join_callbacks.output_batch = callbacks_.output_batch;
+  join_callbacks.finished = [this](int64_t num_total_batches) {
+    return this->OnCollocatedJoinFinished(num_total_batches);
+  };
+
+  builder_ = BloomFilterBuilder::Make(num_threads_ == 1
+                                          ? 
BloomFilterBuildStrategy::SINGLE_THREADED
+                                          : 
BloomFilterBuildStrategy::PARALLEL);
+  RETURN_NOT_OK(build_accumulator_.Init(ctx));
+  RETURN_NOT_OK(probe_accumulator_.Init(ctx));
+
+  for (size_t i = 0; i < SpillingAccumulationQueue::kNumPartitions; i++) {
+    ARROW_ASSIGN_OR_RAISE(
+        impls_[i], is_swiss_ ? HashJoinImpl::MakeSwiss() : 
HashJoinImpl::MakeBasic());
+    RETURN_NOT_OK(impls_[i]->Init(ctx_, join_type, num_threads, proj_map_left,
+                                  proj_map_right, key_cmp, filter, 
join_callbacks));
+
+    task_group_bloom_[i] = callbacks_.register_task_group(
+        [this](size_t thread_index, int64_t task_id) {
+          return PushBloomFilterBatch(thread_index, task_id);
+        },
+        [this](size_t thread_index) { return 
OnBloomFilterFinished(thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::CheckSpilling(size_t thread_index, ExecBatch& batch) {
+  size_t size_of_batch = static_cast<size_t>(batch.TotalBufferSize());
+  size_t max_batch_size = arrow::util::AtomicMax(max_batch_size_, 
size_of_batch);
+
+  // Spilling algorithm proven to not use more than
+  // (SpillThreshold + NumThreads * BatchSize) memory.
+  // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k 
* MaxMemory
+  // with some fuzz factor k (which is 0.8 here because that's what I decided).
+  // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize.
+  constexpr float kFuzzFactor = 0.8f;
+  size_t max_memory = static_cast<size_t>(kFuzzFactor * 
ctx_->options().max_memory_bytes);
+  size_t spill_threshold = static_cast<size_t>(std::max(
+      static_cast<ssize_t>(kFuzzFactor * max_memory - num_threads_ * 
max_batch_size),

Review Comment:
   What would happen if there were two spilling joins in a plan?  I see we are 
using `memory_pool()->bytes_allocated()`so I think this is "global" in some 
sense and would work.  Could we put the fuzz factor in the query context?



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -39,20 +37,180 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& 
that) {
   this->batches_.reserve(this->batches_.size() + that.batches_.size());
   std::move(that.batches_.begin(), that.batches_.end(),
             std::back_inserter(this->batches_));
-  this->row_count_ += that.row_count_;
   that.Clear();
 }
 
 void AccumulationQueue::InsertBatch(ExecBatch batch) {
-  row_count_ += batch.length;
   batches_.emplace_back(std::move(batch));
 }
 
+void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
+  ARROW_DCHECK(idx < batches_.size());
+  batches_[idx] = std::move(batch);
+}
+
+size_t AccumulationQueue::CalculateRowCount() const {
+  size_t count = 0;
+  for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
+  return count;
+}
+
 void AccumulationQueue::Clear() {
-  row_count_ = 0;
   batches_.clear();
 }
 
-ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
-}  // namespace util
+Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
+  ctx_ = ctx;
+  partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
+  for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
+    task_group_read_[ipart] = ctx_->RegisterTaskGroup(
+        [this, ipart](size_t thread_index, int64_t batch_index) {
+          return read_back_fn_[ipart](thread_index, 
static_cast<size_t>(batch_index),
+                                      std::move(queues_[ipart][batch_index]));
+        },
+        [this, ipart](size_t thread_index) { return 
on_finished_[ipart](thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch 
batch) {
+  Datum& hash_datum = batch.values.back();
+  const uint64_t* hashes =
+      reinterpret_cast<const 
uint64_t*>(hash_datum.array()->buffers[1]->data());
+  // `permutation` stores the indices of rows in the input batch sorted by 
partition.
+  std::vector<uint16_t> permutation(batch.length);
+  uint16_t part_starts[kNumPartitions + 1];
+  PartitionSort::Eval(
+      batch.length, kNumPartitions, part_starts,
+      /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
+      /*output_fn=*/
+      [&permutation](int64_t input_pos, int64_t output_pos) {
+        permutation[output_pos] = static_cast<uint16_t>(input_pos);
+      });
+
+  int unprocessed_partition_ids[kNumPartitions];
+  RETURN_NOT_OK(partition_locks_.ForEachPartition(
+      thread_index, unprocessed_partition_ids,
+      /*is_prtn_empty=*/

Review Comment:
   ```suggestion
         /*is_prtn_empty_fn=*/
   ```



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really
+    // need to use the bottom bits of the hash for spilling to avoid
+    // a huge number of hash collisions per partition.
+    return static_cast<size_t>(hash & (kNumPartitions - 1));
+  }
+
+  size_t CalculatePartitionRowCount(size_t partition) const;
+
+  Result<bool> AdvanceSpillCursor();
+  Result<bool> AdvanceHashCursor();

Review Comment:
   Document these methods.  It might be nice to point out that, by the time 
these methods have returned, the spilling file's "bytes in flight" should have 
increased (or, if it writes quickly, the amount of allocated memory may have 
decreased already)



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -39,20 +37,180 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& 
that) {
   this->batches_.reserve(this->batches_.size() + that.batches_.size());
   std::move(that.batches_.begin(), that.batches_.end(),
             std::back_inserter(this->batches_));
-  this->row_count_ += that.row_count_;
   that.Clear();
 }
 
 void AccumulationQueue::InsertBatch(ExecBatch batch) {
-  row_count_ += batch.length;
   batches_.emplace_back(std::move(batch));
 }
 
+void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
+  ARROW_DCHECK(idx < batches_.size());
+  batches_[idx] = std::move(batch);
+}
+
+size_t AccumulationQueue::CalculateRowCount() const {
+  size_t count = 0;
+  for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
+  return count;
+}
+
 void AccumulationQueue::Clear() {
-  row_count_ = 0;
   batches_.clear();
 }
 
-ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
-}  // namespace util
+Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
+  ctx_ = ctx;
+  partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
+  for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
+    task_group_read_[ipart] = ctx_->RegisterTaskGroup(
+        [this, ipart](size_t thread_index, int64_t batch_index) {
+          return read_back_fn_[ipart](thread_index, 
static_cast<size_t>(batch_index),
+                                      std::move(queues_[ipart][batch_index]));
+        },
+        [this, ipart](size_t thread_index) { return 
on_finished_[ipart](thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch 
batch) {
+  Datum& hash_datum = batch.values.back();
+  const uint64_t* hashes =
+      reinterpret_cast<const 
uint64_t*>(hash_datum.array()->buffers[1]->data());
+  // `permutation` stores the indices of rows in the input batch sorted by 
partition.
+  std::vector<uint16_t> permutation(batch.length);
+  uint16_t part_starts[kNumPartitions + 1];
+  PartitionSort::Eval(
+      batch.length, kNumPartitions, part_starts,
+      /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
+      /*output_fn=*/
+      [&permutation](int64_t input_pos, int64_t output_pos) {
+        permutation[output_pos] = static_cast<uint16_t>(input_pos);
+      });
+
+  int unprocessed_partition_ids[kNumPartitions];
+  RETURN_NOT_OK(partition_locks_.ForEachPartition(
+      thread_index, unprocessed_partition_ids,
+      /*is_prtn_empty=*/
+      [&](int part_id) { return part_starts[part_id + 1] == 
part_starts[part_id]; },
+      /*partition=*/
+      [&](int locked_part_id_int) {
+        size_t locked_part_id = static_cast<size_t>(locked_part_id_int);
+        uint64_t num_total_rows_to_append =
+            part_starts[locked_part_id + 1] - part_starts[locked_part_id];
+
+        size_t offset = static_cast<size_t>(part_starts[locked_part_id]);
+        while (num_total_rows_to_append > 0) {
+          int num_rows_to_append =
+              std::min(static_cast<int>(num_total_rows_to_append),
+                       static_cast<int>(ExecBatchBuilder::num_rows_max() -
+                                        builders_[locked_part_id].num_rows()));
+
+          RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
+              ctx_->memory_pool(), batch, num_rows_to_append, 
permutation.data() + offset,
+              batch.num_values()));
+
+          if (builders_[locked_part_id].is_full()) {
+            ExecBatch batch = builders_[locked_part_id].Flush();
+            Datum hash = std::move(batch.values.back());
+            batch.values.pop_back();
+            ExecBatch hash_batch({std::move(hash)}, batch.length);
+            if (locked_part_id < spilling_cursor_)
+              RETURN_NOT_OK(files_[locked_part_id].SpillBatch(ctx_, 
std::move(batch)));
+            else
+              queues_[locked_part_id].InsertBatch(std::move(batch));
+
+            if (locked_part_id >= hash_cursor_)
+              hash_queues_[locked_part_id].InsertBatch(std::move(hash_batch));
+          }
+          offset += num_rows_to_append;
+          num_total_rows_to_append -= num_rows_to_append;
+        }
+        return Status::OK();
+      }));
+  return Status::OK();
+}
+
+const uint64_t* SpillingAccumulationQueue::GetHashes(size_t partition, size_t 
batch_idx) {
+  ARROW_DCHECK(partition >= hash_cursor_.load());
+  if (batch_idx > hash_queues_[partition].batch_count()) {
+    const Datum& datum = hash_queues_[partition][batch_idx].values[0];
+    return reinterpret_cast<const 
uint64_t*>(datum.array()->buffers[1]->data());
+  } else {
+    size_t hash_idx = builders_[partition].num_cols();
+    KeyColumnArray kca = builders_[partition].column(hash_idx - 1);
+    return reinterpret_cast<const uint64_t*>(kca.data(1));
+  }
+}
+
+Status SpillingAccumulationQueue::GetPartition(
+    size_t thread_index, size_t partition,
+    std::function<Status(size_t, size_t, ExecBatch)> on_batch,
+    std::function<Status(size_t)> on_finished) {
+  bool is_in_memory = partition >= spilling_cursor_.load();
+  if (builders_[partition].num_rows() > 0) {
+    ExecBatch batch = builders_[partition].Flush();
+    Datum hash = std::move(batch.values.back());
+    batch.values.pop_back();
+    if (is_in_memory) {
+      ExecBatch hash_batch({std::move(hash)}, batch.length);
+      hash_queues_[partition].InsertBatch(std::move(hash_batch));
+      queues_[partition].InsertBatch(std::move(batch));
+    } else {
+      RETURN_NOT_OK(on_batch(thread_index,
+                             /*batch_index=*/queues_[partition].batch_count(),
+                             std::move(batch)));
+    }
+  }
+
+  if (is_in_memory) {
+    ARROW_DCHECK(partition >= hash_cursor_.load());

Review Comment:
   In other words, we should always be advancing the spill cursor faster than 
we advance the hash cursor?  Is this constraint noted down as a comment 
anywhere?



##########
cpp/src/arrow/compute/exec/accumulation_queue.cc:
##########
@@ -39,20 +37,180 @@ void AccumulationQueue::Concatenate(AccumulationQueue&& 
that) {
   this->batches_.reserve(this->batches_.size() + that.batches_.size());
   std::move(that.batches_.begin(), that.batches_.end(),
             std::back_inserter(this->batches_));
-  this->row_count_ += that.row_count_;
   that.Clear();
 }
 
 void AccumulationQueue::InsertBatch(ExecBatch batch) {
-  row_count_ += batch.length;
   batches_.emplace_back(std::move(batch));
 }
 
+void AccumulationQueue::SetBatch(size_t idx, ExecBatch batch) {
+  ARROW_DCHECK(idx < batches_.size());
+  batches_[idx] = std::move(batch);
+}
+
+size_t AccumulationQueue::CalculateRowCount() const {
+  size_t count = 0;
+  for (const ExecBatch& b : batches_) count += static_cast<size_t>(b.length);
+  return count;
+}
+
 void AccumulationQueue::Clear() {
-  row_count_ = 0;
   batches_.clear();
 }
 
-ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
-}  // namespace util
+Status SpillingAccumulationQueue::Init(QueryContext* ctx) {
+  ctx_ = ctx;
+  partition_locks_.Init(ctx_->max_concurrency(), kNumPartitions);
+  for (size_t ipart = 0; ipart < kNumPartitions; ipart++) {
+    task_group_read_[ipart] = ctx_->RegisterTaskGroup(
+        [this, ipart](size_t thread_index, int64_t batch_index) {
+          return read_back_fn_[ipart](thread_index, 
static_cast<size_t>(batch_index),
+                                      std::move(queues_[ipart][batch_index]));
+        },
+        [this, ipart](size_t thread_index) { return 
on_finished_[ipart](thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingAccumulationQueue::InsertBatch(size_t thread_index, ExecBatch 
batch) {
+  Datum& hash_datum = batch.values.back();
+  const uint64_t* hashes =
+      reinterpret_cast<const 
uint64_t*>(hash_datum.array()->buffers[1]->data());
+  // `permutation` stores the indices of rows in the input batch sorted by 
partition.
+  std::vector<uint16_t> permutation(batch.length);
+  uint16_t part_starts[kNumPartitions + 1];
+  PartitionSort::Eval(
+      batch.length, kNumPartitions, part_starts,
+      /*partition_id=*/[&](int64_t i) { return partition_id(hashes[i]); },
+      /*output_fn=*/
+      [&permutation](int64_t input_pos, int64_t output_pos) {
+        permutation[output_pos] = static_cast<uint16_t>(input_pos);
+      });
+
+  int unprocessed_partition_ids[kNumPartitions];
+  RETURN_NOT_OK(partition_locks_.ForEachPartition(
+      thread_index, unprocessed_partition_ids,
+      /*is_prtn_empty=*/
+      [&](int part_id) { return part_starts[part_id + 1] == 
part_starts[part_id]; },
+      /*partition=*/
+      [&](int locked_part_id_int) {
+        size_t locked_part_id = static_cast<size_t>(locked_part_id_int);
+        uint64_t num_total_rows_to_append =
+            part_starts[locked_part_id + 1] - part_starts[locked_part_id];
+
+        size_t offset = static_cast<size_t>(part_starts[locked_part_id]);
+        while (num_total_rows_to_append > 0) {
+          int num_rows_to_append =
+              std::min(static_cast<int>(num_total_rows_to_append),
+                       static_cast<int>(ExecBatchBuilder::num_rows_max() -
+                                        builders_[locked_part_id].num_rows()));
+
+          RETURN_NOT_OK(builders_[locked_part_id].AppendSelected(
+              ctx_->memory_pool(), batch, num_rows_to_append, 
permutation.data() + offset,
+              batch.num_values()));
+
+          if (builders_[locked_part_id].is_full()) {
+            ExecBatch batch = builders_[locked_part_id].Flush();
+            Datum hash = std::move(batch.values.back());
+            batch.values.pop_back();
+            ExecBatch hash_batch({std::move(hash)}, batch.length);
+            if (locked_part_id < spilling_cursor_)
+              RETURN_NOT_OK(files_[locked_part_id].SpillBatch(ctx_, 
std::move(batch)));
+            else
+              queues_[locked_part_id].InsertBatch(std::move(batch));
+
+            if (locked_part_id >= hash_cursor_)
+              hash_queues_[locked_part_id].InsertBatch(std::move(hash_batch));
+          }
+          offset += num_rows_to_append;
+          num_total_rows_to_append -= num_rows_to_append;
+        }
+        return Status::OK();
+      }));
+  return Status::OK();
+}
+
+const uint64_t* SpillingAccumulationQueue::GetHashes(size_t partition, size_t 
batch_idx) {
+  ARROW_DCHECK(partition >= hash_cursor_.load());
+  if (batch_idx > hash_queues_[partition].batch_count()) {
+    const Datum& datum = hash_queues_[partition][batch_idx].values[0];
+    return reinterpret_cast<const 
uint64_t*>(datum.array()->buffers[1]->data());
+  } else {
+    size_t hash_idx = builders_[partition].num_cols();
+    KeyColumnArray kca = builders_[partition].column(hash_idx - 1);
+    return reinterpret_cast<const uint64_t*>(kca.data(1));
+  }
+}
+
+Status SpillingAccumulationQueue::GetPartition(
+    size_t thread_index, size_t partition,
+    std::function<Status(size_t, size_t, ExecBatch)> on_batch,
+    std::function<Status(size_t)> on_finished) {
+  bool is_in_memory = partition >= spilling_cursor_.load();
+  if (builders_[partition].num_rows() > 0) {
+    ExecBatch batch = builders_[partition].Flush();
+    Datum hash = std::move(batch.values.back());
+    batch.values.pop_back();
+    if (is_in_memory) {

Review Comment:
   Why not just call `on_batch` regardless?  Is this some attempt to preserve 
order if we haven't had to do any spilling?



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really

Review Comment:
   Or even delete both instances of `really`.



##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -42,16 +45,88 @@ class AccumulationQueue {
 
   void Concatenate(AccumulationQueue&& that);
   void InsertBatch(ExecBatch batch);
-  int64_t row_count() { return row_count_; }
-  size_t batch_count() { return batches_.size(); }
+  void SetBatch(size_t idx, ExecBatch batch);
+  size_t batch_count() const { return batches_.size(); }
   bool empty() const { return batches_.empty(); }
+  size_t CalculateRowCount() const;
+
+  // Resizes the accumulation queue to contain size batches. The
+  // new batches will be empty and have length 0, but they will be
+  // usable (useful for concurrent modification of the AccumulationQueue
+  // of separate elements).
+  void Resize(size_t size) { batches_.resize(size); }
   void Clear();
-  ExecBatch& operator[](size_t i);
+  ExecBatch& operator[](size_t i) { return batches_[i]; }
+  const ExecBatch& operator[](size_t i) const { return batches_[i]; }
 
  private:
-  int64_t row_count_;
   std::vector<ExecBatch> batches_;
 };
 
-}  // namespace util
+class SpillingAccumulationQueue {
+ public:
+  // Number of partitions must be a power of two, since we assign partitions by
+  // looking at bottom few bits.
+  static constexpr int kLogNumPartitions = 6;
+  static constexpr int kNumPartitions = 1 << kLogNumPartitions;
+  Status Init(QueryContext* ctx);
+  // Assumes that the final column in batch contains 64-bit hashes of the 
columns.
+  Status InsertBatch(size_t thread_index, ExecBatch batch);
+  Status GetPartition(size_t thread_index, size_t partition,
+                      std::function<Status(size_t, size_t, ExecBatch)>
+                          on_batch,  // thread_index, batch_index, batch
+                      std::function<Status(size_t)> on_finished);
+
+  // Returns hashes of the given partition and batch index.
+  // partition MUST be at least hash_cursor, as if partition < hash_cursor,
+  // these hashes will have been deleted.
+  const uint64_t* GetHashes(size_t partition, size_t batch_idx);
+  inline size_t batch_count(size_t partition) const {
+    size_t num_full_batches = partition >= spilling_cursor_
+                                  ? queues_[partition].batch_count()
+                                  : files_[partition].num_batches();
+
+    return num_full_batches + (builders_[partition].num_rows() > 0);
+  }
+  inline size_t row_count(size_t partition, size_t batch_idx) const {
+    if (batch_idx < hash_queues_[partition].batch_count())
+      return hash_queues_[partition][batch_idx].length;
+    else
+      return builders_[partition].num_rows();
+  }
+
+  static inline constexpr size_t partition_id(uint64_t hash) {
+    // Hash Table uses the top bits of the hash, so we really really
+    // need to use the bottom bits of the hash for spilling to avoid
+    // a huge number of hash collisions per partition.
+    return static_cast<size_t>(hash & (kNumPartitions - 1));
+  }
+
+  size_t CalculatePartitionRowCount(size_t partition) const;
+
+  Result<bool> AdvanceSpillCursor();
+  Result<bool> AdvanceHashCursor();
+  inline size_t spill_cursor() const { return spilling_cursor_.load(); }
+  inline size_t hash_cursor() const { return hash_cursor_.load(); }
+
+ private:
+  std::atomic<size_t> spilling_cursor_{0};  // denotes the first in-memory 
partition
+  std::atomic<size_t> hash_cursor_{0};
+
+  QueryContext* ctx_;
+  PartitionLocks partition_locks_;
+
+  AccumulationQueue queues_[kNumPartitions];
+  AccumulationQueue hash_queues_[kNumPartitions];
+
+  ExecBatchBuilder builders_[kNumPartitions];
+

Review Comment:
   Is there any value in having one array of `PartitionState` structs instead 
of per-field arrays?



##########
cpp/src/arrow/compute/exec/hash_join.h:
##########
@@ -35,33 +35,35 @@
 namespace arrow {
 namespace compute {
 
-using arrow::util::AccumulationQueue;
-
 class HashJoinImpl {
  public:
   using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
   using BuildFinishedCallback = std::function<Status(size_t)>;
-  using FinishedCallback = std::function<void(int64_t)>;
+  using FinishedCallback = std::function<Status(int64_t)>;
   using RegisterTaskGroupCallback = std::function<int(
       std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
   using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
   using AbortContinuationImpl = std::function<void()>;
 
+  struct CallbackRecord {

Review Comment:
   Any particular reason for this change or just a cleanup?



##########
cpp/src/arrow/compute/exec/query_context.cc:
##########
@@ -22,7 +22,9 @@
 namespace arrow {
 using internal::CpuInfo;
 namespace compute {
-QueryOptions::QueryOptions() : use_legacy_batching(false) {}
+QueryOptions::QueryOptions()
+    : max_memory_bytes(::arrow::internal::GetTotalMemoryBytes()),

Review Comment:
   Maybe we should default to something like 50% of this?  If a user wants to 
live close to the edge they can always configure it higher.



##########
cpp/src/arrow/compute/exec/hash_join_benchmark.cc:
##########
@@ -208,13 +211,13 @@ static void HashJoinBasicBenchmarkImpl(benchmark::State& 
st,
                                        BenchmarkSettings& settings) {
   uint64_t total_rows = 0;
   for (auto _ : st) {
-    st.PauseTiming();
     {
       JoinBenchmark bm(settings);
       st.ResumeTiming();
       bm.RunJoin();
       st.PauseTiming();
       total_rows += bm.stats_.num_probe_rows;
+      st.PauseTiming();

Review Comment:
   Is this redundant with the line two lines above?



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {
+  std::unique_ptr<BlockedBloomFilter> in_memory;
+  std::unique_ptr<BlockedBloomFilter>
+      partitions[SpillingAccumulationQueue::kNumPartitions];
+
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* bv);
+};
+
+class SpillingHashJoin {
+ public:
+  using RegisterTaskGroupCallback = std::function<int(
+      std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
+  using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
+  using AddProbeSideHashColumn = std::function<Status(size_t, ExecBatch*)>;
+  using BloomFilterFinishedCallback = std::function<Status(size_t)>;
+  using ApplyBloomFilterCallback = std::function<Status(size_t, ExecBatch*)>;
+  using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
+  using FinishedCallback = std::function<Status(int64_t)>;
+  using StartSpillingCallback = std::function<Status(size_t)>;
+  using PauseProbeSideCallback = std::function<void(int)>;
+  using ResumeProbeSideCallback = std::function<void(int)>;
+
+  struct CallbackRecord {
+    RegisterTaskGroupCallback register_task_group;
+    StartTaskGroupCallback start_task_group;
+    AddProbeSideHashColumn add_probe_side_hashes;
+    BloomFilterFinishedCallback bloom_filter_finished;
+    ApplyBloomFilterCallback apply_bloom_filter;
+    OutputBatchCallback output_batch;
+    FinishedCallback finished;
+    StartSpillingCallback start_spilling;
+    PauseProbeSideCallback pause_probe_side;
+    ResumeProbeSideCallback resume_probe_side;
+  };
+
+  Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads,
+              SchemaProjectionMaps<HashJoinProjection>* proj_map_left,
+              SchemaProjectionMaps<HashJoinProjection>* proj_map_right,
+              std::vector<JoinKeyCmp>* key_cmp, Expression* filter,
+              PartitionedBloomFilter* bloom_filter, CallbackRecord 
callback_record,
+              bool is_swiss);
+
+  Status CheckSpilling(size_t thread_index, ExecBatch& batch);
+
+  Status OnBuildSideBatch(size_t thread_index, ExecBatch batch);
+  Status OnBuildSideFinished(size_t thread_index);
+
+  Status OnProbeSideBatch(size_t thread_index, ExecBatch batch);
+  Status OnProbeSideFinished(size_t thread_index);
+
+  Status OnBloomFiltersReceived(size_t thread_index);
+
+ private:
+  Status AdvanceSpillCursor(size_t thread_index);
+
+  // Builds the entire bloom filter for all 64 partitions.
+  Status BuildPartitionedBloomFilter(size_t thread_index);
+  Status PushBloomFilterBatch(size_t thread_index, int64_t batch_id);
+  // Builds a bloom filter for a single partition.
+  Status BuildNextBloomFilter(size_t thread_index);
+  Status OnBloomFilterFinished(size_t thread_index);
+  Status OnPartitionedBloomFilterFinished(size_t thread_index);
+
+  Status StartCollocatedJoins(size_t thread_index);
+  Status BeginNextCollocatedJoin(size_t thread_index);
+  Status BuildHashTable(size_t thread_index);
+  Status OnHashTableFinished(size_t thread_index);
+  Status OnProbeSideBatchReadBack(size_t thread_index, size_t batch_idx, 
ExecBatch batch);
+  Status OnProbingFinished(size_t thread_index);
+  Status OnCollocatedJoinFinished(int64_t num_batches);
+
+  QueryContext* ctx_;
+  size_t num_threads_;
+  CallbackRecord callbacks_;
+  bool is_swiss_;
+  PartitionedBloomFilter* bloom_filter_;
+  std::unique_ptr<BloomFilterBuilder> builder_;
+
+  // Backpressure toggling happens at most twice during execution. A value of 
0 means
+  // we haven't toggled backpressure at all, value of 1 means we've paused, 
and value
+  // 2 means we've resumed.
+  std::atomic<int32_t> backpressure_counter_{0};

Review Comment:
   What if a downstream node asks this node to pause?  Would it be possible to 
pause reading from the spill file?  This can be done in a follow-up if needed.



##########
cpp/src/arrow/util/atomic_util.h:
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <type_traits>
+
+namespace arrow {
+namespace util {
+template <typename T>
+inline T AtomicMax(std::atomic<T>& to_max, T val) {

Review Comment:
   ```suggestion
   /// update `to_max` to `val` unless it is already greater than `val`
   ///
   /// returns the greater of val and the current value of `to_max`
   inline T AtomicMax(std::atomic<T>& to_max, T val) {
   ```
   Are we assuming that `to_max` only increases?  In other words this variable 
is just a shared maximum and not used for anything else?



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {
+  std::unique_ptr<BlockedBloomFilter> in_memory;
+  std::unique_ptr<BlockedBloomFilter>
+      partitions[SpillingAccumulationQueue::kNumPartitions];
+
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* bv);
+};
+
+class SpillingHashJoin {

Review Comment:
   There is a significant lack of comments in this file. At the very least we 
need to mention that this is a HashJoin implementation that accumulates input 
in a spilling queue and then delegates the actual hash-join behavior to an 
underlying implementation (either basic or swiss).  It would be good to mention 
a rough overview of the algorithm as well as how bloom filtering works with 
spilling.
   
   Other things we might consider:
   
    * How does it determine when to spill?
    * How and when is backpressure applied?
    * Prioritization of spilling (e.g. we spill probe side first, then build 
side, then probe hashes, then build hashes)



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {

Review Comment:
   What's the relationship between the bloom filters here and the bloom filters 
in hash_join_node.cc?



##########
cpp/src/arrow/util/atomic_util.h:
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <type_traits>
+
+namespace arrow {
+namespace util {
+template <typename T>
+inline T AtomicMax(std::atomic<T>& to_max, T val) {
+  static_assert(std::is_arithmetic<T>::value,
+                "Maximum only makes sense on numeric types!");
+  T local_to_max = to_max.load(std::memory_order_relaxed);
+  while (val > local_to_max &&
+         !to_max.compare_exchange_weak(local_to_max, val, 
std::memory_order_release,
+                                       std::memory_order_relaxed)) {
+  }
+  return to_max.load(std::memory_order_relaxed);
+}
+
+#if defined(__clang) || defined(__GNUC__)
+template <typename T>
+inline T AtomicLoad(T* addr,
+                    std::memory_order order = std::memory_order_seq_cst) 
noexcept {
+  T ret;
+  __atomic_load(addr, &ret, order);
+  return ret;
+}
+
+template <typename T>
+inline void AtomicStore(T* addr, T& val,
+                        std::memory_order order = std::memory_order_seq_cst) 
noexcept {
+  __atomic_store(addr, val, order);
+}
+
+template <typename T>
+inline T AtomicFetchAdd(T* addr, T& val,
+                        std::memory_order order = std::memory_order_seq_cst) 
noexcept {
+  static_assert(std::is_integral<T>::value,
+                "AtomicFetchAdd can only be used on integral types");
+  return __atomic_fetch_add(addr, val, order);
+}
+
+template <typename T>
+inline T AtomicFetchSub(T* addr, T& val,
+                        std::memory_order order = std::memory_order_seq_cst) 
noexcept {
+  static_assert(std::is_integral<T>::value,
+                "AtomicFetchSub can only be used on integral types");
+  return __atomic_fetch_sub(addr, val, order);
+}
+
+#elif defined(_MSC_VER)
+#include <intrin.h>
+template <typename T>
+inline T AtomicLoad(T* addr, std::memory_order /*order*/) noexcept {
+  T val = *addr;
+  _ReadWriteBarrier();
+  return val;
+}
+
+template <typename T>
+inline void AtomicStore(T* addr, T& val, std::memory_order /*order*/) noexcept 
{
+  _ReadWriteBarrier();
+  *addr = val;
+}
+
+template <typename T>
+inline T AtomicFetchAdd(T* addr, T& val, std::memory_order /*order*/) noexcept 
{
+  static_assert(std::is_integral<T>::value,
+                "AtomicFetchAdd can only be used on integral types");
+  if constexpr (sizeof(T) == 1) return _InterlockedExchangeAdd8(addr, val);
+  if constexpr (sizeof(T) == 2) return _InterlockedExchangeAdd16(addr, val);
+  if constexpr (sizeof(T) == 4) return _InterlockedExchangeAdd(addr, val);
+  if constexpr (sizeof(T) == 8) {
+#if _WIN64
+    return _InterlockedExchangeAdd64(addr, val);
+#else
+    _ReadWriteBarrier();
+    T expected = *addr;
+    for (;;) {
+      T new_val = expected + val;
+      T prev = _InterlockedCompareExchange64(addr, new_val, expected);
+      if (prev == expected) return prev;
+      expected = prev;
+    }
+  }
+#endif
+  }
+
+  template <typename T>
+  inline T AtomicFetchSub(T * addr, T & val, std::memory_order /*order*/) 
noexcept {
+    return AtomicFetchAdd(addr, -val);
+  }
+#endif

Review Comment:
   As best I can tell these methods are unused.  Let's leave them out until 
needed.



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;
+};
+
+static void SpillingWrite_Impl(benchmark::State& st,
+                               SpillingBenchmarkSettings& settings) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;
+  int64_t num_files = settings.num_files;
+  std::shared_ptr<Schema> bm_schema =
+      schema({field("f1", int32()), field("f2", int32())});
+  Random64Bit rng(42);
+  for (auto _ : st) {
+    st.PauseTiming();
+    {
+      QueryContext ctx;
+      std::vector<SpillFile> file(num_files);
+      Future<> fut = 
util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) {
+        RETURN_NOT_OK(ctx.Init(settings.num_threads, sched));
+        if (settings.num_threads != -1)
+          
RETURN_NOT_OK(arrow::internal::checked_cast<arrow::internal::ThreadPool*>(
+                            ctx.io_context()->executor())
+                            
->SetCapacity(static_cast<int>(settings.num_threads)));
+        BatchesWithSchema batches = MakeRandomBatches(
+            bm_schema, num_batches, batch_size, SpillFile::kAlignment, 
ctx.memory_pool());
+        st.ResumeTiming();
+
+        for (ExecBatch& b : batches.batches) {
+          int64_t idx = rng.from_range(static_cast<int64_t>(0), num_files - 1);
+          RETURN_NOT_OK(file[idx].SpillBatch(&ctx, std::move(b)));
+        }
+        return Status::OK();
+      });
+      fut.Wait();
+      st.PauseTiming();
+      for (SpillFile& f : file) DCHECK_OK(f.Cleanup());
+    }
+    st.ResumeTiming();
+  }
+  st.counters["BytesProcessed"] = benchmark::Counter(
+      num_batches * batch_size * sizeof(int32_t) * 2,
+      benchmark::Counter::kIsIterationInvariantRate, 
benchmark::Counter::OneK::kIs1024);
+}
+
+static void BM_SpillingWrite(benchmark::State& st) {
+  SpillingBenchmarkSettings settings;
+  settings.num_files = st.range(0);
+  SpillingWrite_Impl(st, settings);
+}
+
+static void BM_SpillingRead(benchmark::State& st) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;
+  std::shared_ptr<Schema> bm_schema =
+      schema({field("f1", int32()), field("f2", int32())});
+  for (auto _ : st) {
+    st.PauseTiming();
+    {
+      SpillFile file;
+      QueryContext ctx;
+      Future<> fut = 
util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) {
+        RETURN_NOT_OK(ctx.Init(std::thread::hardware_concurrency(), sched));
+        BatchesWithSchema batches = MakeRandomBatches(
+            bm_schema, num_batches, batch_size, SpillFile::kAlignment, 
ctx.memory_pool());
+
+        std::vector<ExecBatch> accum(num_batches);
+        for (ExecBatch& b : batches.batches)
+          DCHECK_OK(file.SpillBatch(&ctx, std::move(b)));
+
+        while (file.batches_written() < num_batches) std::this_thread::yield();
+
+        RETURN_NOT_OK(file.PreallocateBatches(ctx.memory_pool()));
+        st.ResumeTiming();
+
+        RETURN_NOT_OK(file.ReadBackBatches(
+            &ctx,
+            [&](size_t, size_t idx, ExecBatch batch) {
+              accum[idx] = std::move(batch);
+              return Status::OK();
+            },
+            [&](size_t) { return Status::OK(); }));
+        return Status::OK();
+      });
+      fut.Wait();
+      st.PauseTiming();
+      DCHECK_OK(file.Cleanup());
+    }
+    st.ResumeTiming();

Review Comment:
   Resuming timing only to immediately pause timing is a little odd.



##########
cpp/src/arrow/compute/exec/spilling_join.h:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bitset>
+
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/hash_join.h"
+#include "arrow/compute/exec/query_context.h"
+
+namespace arrow {
+namespace compute {
+struct PartitionedBloomFilter {
+  std::unique_ptr<BlockedBloomFilter> in_memory;
+  std::unique_ptr<BlockedBloomFilter>
+      partitions[SpillingAccumulationQueue::kNumPartitions];
+
+  void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
+            uint8_t* bv);
+};
+
+class SpillingHashJoin {
+ public:
+  using RegisterTaskGroupCallback = std::function<int(
+      std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
+  using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
+  using AddProbeSideHashColumn = std::function<Status(size_t, ExecBatch*)>;
+  using BloomFilterFinishedCallback = std::function<Status(size_t)>;
+  using ApplyBloomFilterCallback = std::function<Status(size_t, ExecBatch*)>;
+  using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
+  using FinishedCallback = std::function<Status(int64_t)>;
+  using StartSpillingCallback = std::function<Status(size_t)>;
+  using PauseProbeSideCallback = std::function<void(int)>;
+  using ResumeProbeSideCallback = std::function<void(int)>;
+
+  struct CallbackRecord {

Review Comment:
   For each of these callbacks we should have a brief comment explaining what 
exactly is expected.  For example what is a `AddProbeSideHashColumn` or 
`StartSpillingCallback` implementation supposed to do?
   



##########
cpp/src/arrow/compute/exec/spilling_join.cc:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "arrow/compute/exec/spilling_join.h"
+#include "arrow/util/atomic_util.h"
+
+namespace arrow {
+namespace compute {
+void PartitionedBloomFilter::Find(int64_t hardware_flags, int64_t num_rows,
+                                  const uint64_t* hashes, uint8_t* bv) {
+  if (in_memory) return in_memory->Find(hardware_flags, num_rows, hashes, bv);
+
+  for (int64_t i = 0; i < num_rows; i++) {
+    uint64_t hash = hashes[i];
+    size_t partition = SpillingAccumulationQueue::partition_id(hashes[i]);
+    bool found = partitions[partition] ? partitions[partition]->Find(hash) : 
true;
+    bit_util::SetBitTo(bv, i, found);
+  }
+}
+
+Status SpillingHashJoin::Init(QueryContext* ctx, JoinType join_type, size_t 
num_threads,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_left,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_right,
+                              std::vector<JoinKeyCmp>* key_cmp, Expression* 
filter,
+                              PartitionedBloomFilter* bloom_filter,
+                              CallbackRecord callback_record, bool is_swiss) {
+  ctx_ = ctx;
+  num_threads_ = num_threads;
+  callbacks_ = std::move(callback_record);
+  bloom_filter_ = bloom_filter;
+  is_swiss_ = is_swiss;
+
+  HashJoinImpl::CallbackRecord join_callbacks;
+  join_callbacks.register_task_group = callbacks_.register_task_group;
+  join_callbacks.start_task_group = callbacks_.start_task_group;
+  join_callbacks.output_batch = callbacks_.output_batch;
+  join_callbacks.finished = [this](int64_t num_total_batches) {
+    return this->OnCollocatedJoinFinished(num_total_batches);
+  };
+
+  builder_ = BloomFilterBuilder::Make(num_threads_ == 1
+                                          ? 
BloomFilterBuildStrategy::SINGLE_THREADED
+                                          : 
BloomFilterBuildStrategy::PARALLEL);
+  RETURN_NOT_OK(build_accumulator_.Init(ctx));
+  RETURN_NOT_OK(probe_accumulator_.Init(ctx));
+
+  for (size_t i = 0; i < SpillingAccumulationQueue::kNumPartitions; i++) {
+    ARROW_ASSIGN_OR_RAISE(
+        impls_[i], is_swiss_ ? HashJoinImpl::MakeSwiss() : 
HashJoinImpl::MakeBasic());
+    RETURN_NOT_OK(impls_[i]->Init(ctx_, join_type, num_threads, proj_map_left,
+                                  proj_map_right, key_cmp, filter, 
join_callbacks));
+
+    task_group_bloom_[i] = callbacks_.register_task_group(
+        [this](size_t thread_index, int64_t task_id) {
+          return PushBloomFilterBatch(thread_index, task_id);
+        },
+        [this](size_t thread_index) { return 
OnBloomFilterFinished(thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::CheckSpilling(size_t thread_index, ExecBatch& batch) {
+  size_t size_of_batch = static_cast<size_t>(batch.TotalBufferSize());
+  size_t max_batch_size = arrow::util::AtomicMax(max_batch_size_, 
size_of_batch);
+
+  // Spilling algorithm proven to not use more than
+  // (SpillThreshold + NumThreads * BatchSize) memory.
+  // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k 
* MaxMemory
+  // with some fuzz factor k (which is 0.8 here because that's what I decided).
+  // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize.
+  constexpr float kFuzzFactor = 0.8f;
+  size_t max_memory = static_cast<size_t>(kFuzzFactor * 
ctx_->options().max_memory_bytes);
+  size_t spill_threshold = static_cast<size_t>(std::max(
+      static_cast<ssize_t>(kFuzzFactor * max_memory - num_threads_ * 
max_batch_size),
+      static_cast<ssize_t>(0)));
+  size_t bytes_allocated = 
static_cast<size_t>(ctx_->memory_pool()->bytes_allocated());
+  size_t bytes_inflight = ctx_->GetCurrentTempFileIO();
+
+  size_t backpressure_threshold = spill_threshold / 2;
+  if (bytes_allocated > backpressure_threshold) {
+    if (int32_t expected = 0; 
backpressure_counter_.compare_exchange_strong(expected, 1))
+      callbacks_.pause_probe_side(1);
+  }
+  if ((bytes_allocated - bytes_inflight) > spill_threshold) {
+    RETURN_NOT_OK(AdvanceSpillCursor(thread_index));
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::AdvanceSpillCursor(size_t thread_index) {
+  if (bool expected = false;
+      !spilling_.load() && spilling_.compare_exchange_strong(expected, true))
+    return callbacks_.start_spilling(thread_index);
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_advanced, 
probe_accumulator_.AdvanceSpillCursor());
+  if (probe_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_advanced, 
build_accumulator_.AdvanceSpillCursor());
+  if (build_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_hash_advanced, 
probe_accumulator_.AdvanceHashCursor());
+  if (probe_hash_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_hash_advanced, 
build_accumulator_.AdvanceHashCursor());
+  if (build_hash_advanced) return Status::OK();
+
+  // Pray we don't run out of memory
+  return Status::OK();

Review Comment:
   Or maybe we should send an open telemetry event instead?



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;

Review Comment:
   ```suggestion
     // number of I/O threads. If -1 then the default I/O capacity will be used.
     int64_t num_threads = -1;
   ```



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;
+};
+
+static void SpillingWrite_Impl(benchmark::State& st,
+                               SpillingBenchmarkSettings& settings) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;
+  int64_t num_files = settings.num_files;
+  std::shared_ptr<Schema> bm_schema =
+      schema({field("f1", int32()), field("f2", int32())});
+  Random64Bit rng(42);
+  for (auto _ : st) {
+    st.PauseTiming();
+    {
+      QueryContext ctx;
+      std::vector<SpillFile> file(num_files);
+      Future<> fut = 
util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) {
+        RETURN_NOT_OK(ctx.Init(settings.num_threads, sched));
+        if (settings.num_threads != -1)
+          
RETURN_NOT_OK(arrow::internal::checked_cast<arrow::internal::ThreadPool*>(
+                            ctx.io_context()->executor())
+                            
->SetCapacity(static_cast<int>(settings.num_threads)));

Review Comment:
   This is a global pool and you don't set the capacity back to what it used to 
be so will this cause a side effect in future benchmarks?  Maybe it would be 
easier to always set the I/O executor in these benchmarks and don't bother 
testing the default?



##########
cpp/src/arrow/compute/exec/spilling_join.cc:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "arrow/compute/exec/spilling_join.h"
+#include "arrow/util/atomic_util.h"
+
+namespace arrow {
+namespace compute {
+void PartitionedBloomFilter::Find(int64_t hardware_flags, int64_t num_rows,
+                                  const uint64_t* hashes, uint8_t* bv) {
+  if (in_memory) return in_memory->Find(hardware_flags, num_rows, hashes, bv);
+
+  for (int64_t i = 0; i < num_rows; i++) {
+    uint64_t hash = hashes[i];
+    size_t partition = SpillingAccumulationQueue::partition_id(hashes[i]);
+    bool found = partitions[partition] ? partitions[partition]->Find(hash) : 
true;
+    bit_util::SetBitTo(bv, i, found);
+  }
+}
+
+Status SpillingHashJoin::Init(QueryContext* ctx, JoinType join_type, size_t 
num_threads,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_left,
+                              SchemaProjectionMaps<HashJoinProjection>* 
proj_map_right,
+                              std::vector<JoinKeyCmp>* key_cmp, Expression* 
filter,
+                              PartitionedBloomFilter* bloom_filter,
+                              CallbackRecord callback_record, bool is_swiss) {
+  ctx_ = ctx;
+  num_threads_ = num_threads;
+  callbacks_ = std::move(callback_record);
+  bloom_filter_ = bloom_filter;
+  is_swiss_ = is_swiss;
+
+  HashJoinImpl::CallbackRecord join_callbacks;
+  join_callbacks.register_task_group = callbacks_.register_task_group;
+  join_callbacks.start_task_group = callbacks_.start_task_group;
+  join_callbacks.output_batch = callbacks_.output_batch;
+  join_callbacks.finished = [this](int64_t num_total_batches) {
+    return this->OnCollocatedJoinFinished(num_total_batches);
+  };
+
+  builder_ = BloomFilterBuilder::Make(num_threads_ == 1
+                                          ? 
BloomFilterBuildStrategy::SINGLE_THREADED
+                                          : 
BloomFilterBuildStrategy::PARALLEL);
+  RETURN_NOT_OK(build_accumulator_.Init(ctx));
+  RETURN_NOT_OK(probe_accumulator_.Init(ctx));
+
+  for (size_t i = 0; i < SpillingAccumulationQueue::kNumPartitions; i++) {
+    ARROW_ASSIGN_OR_RAISE(
+        impls_[i], is_swiss_ ? HashJoinImpl::MakeSwiss() : 
HashJoinImpl::MakeBasic());
+    RETURN_NOT_OK(impls_[i]->Init(ctx_, join_type, num_threads, proj_map_left,
+                                  proj_map_right, key_cmp, filter, 
join_callbacks));
+
+    task_group_bloom_[i] = callbacks_.register_task_group(
+        [this](size_t thread_index, int64_t task_id) {
+          return PushBloomFilterBatch(thread_index, task_id);
+        },
+        [this](size_t thread_index) { return 
OnBloomFilterFinished(thread_index); });
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::CheckSpilling(size_t thread_index, ExecBatch& batch) {
+  size_t size_of_batch = static_cast<size_t>(batch.TotalBufferSize());
+  size_t max_batch_size = arrow::util::AtomicMax(max_batch_size_, 
size_of_batch);
+
+  // Spilling algorithm proven to not use more than
+  // (SpillThreshold + NumThreads * BatchSize) memory.
+  // Thus we want to spill when (SpillThreshold + NumThreads * BatchSize) = k 
* MaxMemory
+  // with some fuzz factor k (which is 0.8 here because that's what I decided).
+  // Thus SpillThreshold = k * MaxMemory - NumThreads * BatchSize.
+  constexpr float kFuzzFactor = 0.8f;
+  size_t max_memory = static_cast<size_t>(kFuzzFactor * 
ctx_->options().max_memory_bytes);
+  size_t spill_threshold = static_cast<size_t>(std::max(
+      static_cast<ssize_t>(kFuzzFactor * max_memory - num_threads_ * 
max_batch_size),
+      static_cast<ssize_t>(0)));
+  size_t bytes_allocated = 
static_cast<size_t>(ctx_->memory_pool()->bytes_allocated());
+  size_t bytes_inflight = ctx_->GetCurrentTempFileIO();
+
+  size_t backpressure_threshold = spill_threshold / 2;
+  if (bytes_allocated > backpressure_threshold) {
+    if (int32_t expected = 0; 
backpressure_counter_.compare_exchange_strong(expected, 1))
+      callbacks_.pause_probe_side(1);
+  }
+  if ((bytes_allocated - bytes_inflight) > spill_threshold) {
+    RETURN_NOT_OK(AdvanceSpillCursor(thread_index));
+  }
+  return Status::OK();
+}
+
+Status SpillingHashJoin::AdvanceSpillCursor(size_t thread_index) {
+  if (bool expected = false;
+      !spilling_.load() && spilling_.compare_exchange_strong(expected, true))
+    return callbacks_.start_spilling(thread_index);
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_advanced, 
probe_accumulator_.AdvanceSpillCursor());
+  if (probe_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_advanced, 
build_accumulator_.AdvanceSpillCursor());
+  if (build_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool probe_hash_advanced, 
probe_accumulator_.AdvanceHashCursor());
+  if (probe_hash_advanced) return Status::OK();
+
+  ARROW_ASSIGN_OR_RAISE(bool build_hash_advanced, 
build_accumulator_.AdvanceHashCursor());
+  if (build_hash_advanced) return Status::OK();
+
+  // Pray we don't run out of memory
+  return Status::OK();
+}
+
+Status SpillingHashJoin::OnBuildSideBatch(size_t thread_index, ExecBatch 
batch) {
+  return build_accumulator_.InsertBatch(thread_index, std::move(batch));
+}
+
+Status SpillingHashJoin::OnBuildSideFinished(size_t thread_index) {
+  return BuildPartitionedBloomFilter(thread_index);
+}
+
+// Note about Bloom filter implementation:
+// Currently, we disable a partition for a Bloom filter based on the size of
+// the hashes for that partition. Instead, we should be disabling based on
+// the size of the bloom filter itself, since a Bloom filter would use about
+// 8-16 bits per value instead of 64 bits per value.
+Status SpillingHashJoin::BuildPartitionedBloomFilter(size_t thread_index) {
+  // Disable Bloom filter if bloom_filter_ = nullptr by advancing to past
+  // the final Bloom filter
+  partition_idx_ = (bloom_filter_ == nullptr) ? 
SpillingAccumulationQueue::kNumPartitions
+                                              : 
build_accumulator_.hash_cursor();
+  return BuildNextBloomFilter(thread_index);
+}
+
+Status SpillingHashJoin::PushBloomFilterBatch(size_t thread_index, int64_t 
batch_id) {
+  const uint64_t* hashes =
+      build_accumulator_.GetHashes(partition_idx_, 
static_cast<size_t>(batch_id));
+  size_t num_rows =
+      build_accumulator_.row_count(partition_idx_, 
static_cast<size_t>(batch_id));
+  return builder_->PushNextBatch(thread_index, static_cast<int64_t>(num_rows), 
hashes);
+}
+
+Status SpillingHashJoin::BuildNextBloomFilter(size_t thread_index) {
+  size_t num_rows = 
build_accumulator_.CalculatePartitionRowCount(partition_idx_);
+  size_t num_batches = build_accumulator_.batch_count(partition_idx_);
+
+  // partition_idx_ is incremented in the callback for the taskgroup
+  bloom_filter_->partitions[partition_idx_] = 
std::make_unique<BlockedBloomFilter>();
+
+  RETURN_NOT_OK(builder_->Begin(num_threads_, 
ctx_->cpu_info()->hardware_flags(),
+                                ctx_->memory_pool(), num_rows, num_batches,
+                                
bloom_filter_->partitions[partition_idx_].get()));
+
+  return callbacks_.start_task_group(task_group_bloom_[partition_idx_],
+                                     
build_accumulator_.batch_count(partition_idx_));
+}
+
+Status SpillingHashJoin::OnBloomFilterFinished(size_t thread_index) {
+  if (++partition_idx_ >= SpillingAccumulationQueue::kNumPartitions)
+    return OnPartitionedBloomFilterFinished(thread_index);
+  return BuildNextBloomFilter(thread_index);
+}
+
+Status SpillingHashJoin::OnPartitionedBloomFilterFinished(size_t thread_index) 
{
+  RETURN_NOT_OK(callbacks_.bloom_filter_finished(thread_index));
+  backpressure_counter_.store(2);

Review Comment:
   I'm not sure this is needed.  Couldn't you make `backpressure_counter_` an 
atomic bool that flips from false to true at most once?



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;
+};
+
+static void SpillingWrite_Impl(benchmark::State& st,
+                               SpillingBenchmarkSettings& settings) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;

Review Comment:
   ```suggestion
     constexpr int batch_size = 32000;
   ```
   Minor nit: I find `rows_per_batch` to be a little less ambiguous than 
`batch_size` (which might be referring to bytes).



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;
+};
+
+static void SpillingWrite_Impl(benchmark::State& st,
+                               SpillingBenchmarkSettings& settings) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;
+  int64_t num_files = settings.num_files;
+  std::shared_ptr<Schema> bm_schema =
+      schema({field("f1", int32()), field("f2", int32())});
+  Random64Bit rng(42);
+  for (auto _ : st) {
+    st.PauseTiming();
+    {
+      QueryContext ctx;
+      std::vector<SpillFile> file(num_files);
+      Future<> fut = 
util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) {
+        RETURN_NOT_OK(ctx.Init(settings.num_threads, sched));
+        if (settings.num_threads != -1)
+          
RETURN_NOT_OK(arrow::internal::checked_cast<arrow::internal::ThreadPool*>(
+                            ctx.io_context()->executor())
+                            
->SetCapacity(static_cast<int>(settings.num_threads)));
+        BatchesWithSchema batches = MakeRandomBatches(
+            bm_schema, num_batches, batch_size, SpillFile::kAlignment, 
ctx.memory_pool());
+        st.ResumeTiming();
+
+        for (ExecBatch& b : batches.batches) {
+          int64_t idx = rng.from_range(static_cast<int64_t>(0), num_files - 1);
+          RETURN_NOT_OK(file[idx].SpillBatch(&ctx, std::move(b)));
+        }
+        return Status::OK();
+      });
+      fut.Wait();
+      st.PauseTiming();
+      for (SpillFile& f : file) DCHECK_OK(f.Cleanup());
+    }
+    st.ResumeTiming();
+  }
+  st.counters["BytesProcessed"] = benchmark::Counter(
+      num_batches * batch_size * sizeof(int32_t) * 2,
+      benchmark::Counter::kIsIterationInvariantRate, 
benchmark::Counter::OneK::kIs1024);
+}
+
+static void BM_SpillingWrite(benchmark::State& st) {
+  SpillingBenchmarkSettings settings;
+  settings.num_files = st.range(0);
+  SpillingWrite_Impl(st, settings);
+}
+
+static void BM_SpillingRead(benchmark::State& st) {

Review Comment:
   Minor nit: It's a little bit inconsistent that you split the write benchmark 
into two methods but not the read benchmark.



##########
cpp/src/arrow/compute/exec/spilling_benchmark.cc:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/util/checked_cast.h"
+#include "benchmark/benchmark.h"
+
+namespace arrow {
+namespace compute {
+struct SpillingBenchmarkSettings {
+  int64_t num_files = 4;
+  int64_t num_threads = -1;
+};
+
+static void SpillingWrite_Impl(benchmark::State& st,
+                               SpillingBenchmarkSettings& settings) {
+  constexpr int num_batches = 1024;
+  constexpr int batch_size = 32000;
+  int64_t num_files = settings.num_files;
+  std::shared_ptr<Schema> bm_schema =
+      schema({field("f1", int32()), field("f2", int32())});
+  Random64Bit rng(42);
+  for (auto _ : st) {
+    st.PauseTiming();
+    {
+      QueryContext ctx;
+      std::vector<SpillFile> file(num_files);
+      Future<> fut = 
util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* sched) {
+        RETURN_NOT_OK(ctx.Init(settings.num_threads, sched));

Review Comment:
   I don't think this works if `num_threads == -1`



##########
cpp/src/arrow/compute/exec/spilling_test.cc:
##########
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include <condition_variable>
+#include <mutex>
+
+#include "arrow/api.h"
+#include "arrow/compute/exec/accumulation_queue.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/spilling_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/random.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+enum class SpillingTestParam {
+  None,
+  Values,
+  ValuesAndHashes,
+};
+
+void TestSpillingAccumulationQueue(SpillingTestParam param) {
+  QueryContext ctx;
+  SpillingAccumulationQueue queue;
+
+  Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* 
sched) {
+    RETURN_NOT_OK(ctx.Init(ctx.max_concurrency(), sched));
+    RETURN_NOT_OK(queue.Init(&ctx));
+    ctx.scheduler()->RegisterEnd();
+    RETURN_NOT_OK(ctx.scheduler()->StartScheduling(
+        /*thread_index=*/0,
+        [&ctx](std::function<Status(size_t)> fn) {
+          return ctx.ScheduleTask(std::move(fn));
+        },
+        /*concurrent_tasks=*/static_cast<int>(ctx.max_concurrency()), false));
+
+    size_t num_batches = 4 * SpillingAccumulationQueue::kNumPartitions;
+    size_t rows_per_batch = ExecBatchBuilder::num_rows_max();
+    std::vector<ExecBatch> batches;
+
+    size_t spill_every_n_batches = 0;
+    switch (param) {
+      case SpillingTestParam::None:
+        spill_every_n_batches = num_batches;
+        break;
+      case SpillingTestParam::Values:
+        spill_every_n_batches = 32;
+        break;
+      case SpillingTestParam::ValuesAndHashes:
+        spill_every_n_batches = 3;
+        break;
+      default:
+        DCHECK(false);
+    }
+
+    int num_vals_spilled = 0;
+    int num_hashes_spilled = 0;
+    for (size_t i = 0; i < num_batches; i++) {
+      if (i % spill_every_n_batches == 0) {
+        ARROW_ASSIGN_OR_RAISE(bool advanced, queue.AdvanceSpillCursor());
+        if (num_vals_spilled < SpillingAccumulationQueue::kNumPartitions) {
+          ARROW_CHECK(advanced);
+        }
+        num_vals_spilled++;
+
+        if (!advanced) {
+          ARROW_ASSIGN_OR_RAISE(bool advanced_hash, queue.AdvanceHashCursor());
+          if (num_hashes_spilled < SpillingAccumulationQueue::kNumPartitions) {
+            ARROW_CHECK(advanced_hash);
+          }
+          num_hashes_spilled++;
+        }
+      }
+
+      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> vals_buf,
+                            AllocateBuffer(sizeof(uint64_t) * rows_per_batch));
+      ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> hashes_buf,
+                            AllocateBuffer(sizeof(uint64_t) * rows_per_batch));
+
+      uint64_t* vals = reinterpret_cast<uint64_t*>(vals_buf->mutable_data());
+      uint64_t* hashes = 
reinterpret_cast<uint64_t*>(hashes_buf->mutable_data());
+      for (size_t j = 0; j < rows_per_batch; j++) {
+        vals[j] = j;
+        hashes[j] = (j % SpillingAccumulationQueue::kNumPartitions);
+      }
+
+      ArrayData vals_data(uint64(), rows_per_batch, {nullptr, 
std::move(vals_buf)});
+      ArrayData hashes_data(uint64(), rows_per_batch, {nullptr, 
std::move(hashes_buf)});
+      ExecBatch batch({std::move(vals_data), std::move(hashes_data)}, 
rows_per_batch);
+      ARROW_CHECK_OK(queue.InsertBatch(/*thread_index=*/0, std::move(batch)));
+    }
+
+    for (size_t ipart = 0; ipart < SpillingAccumulationQueue::kNumPartitions; 
ipart++) {
+      Future<> fut = Future<>::Make();
+      AccumulationQueue ac;
+      ac.Resize(queue.batch_count(ipart));
+      ARROW_CHECK_OK(queue.GetPartition(
+          /*thread_index=*/0,
+          /*partition=*/ipart,
+          [&](size_t, size_t batch_idx, ExecBatch batch) {
+            ac[batch_idx] = std::move(batch);
+            return Status::OK();
+          },
+          [&](size_t) {
+            fut.MarkFinished();
+            return Status::OK();
+          }));
+      ARROW_CHECK_OK(fut.status());
+      ARROW_CHECK_EQ(ac.batch_count(),
+                     num_batches / SpillingAccumulationQueue::kNumPartitions);
+      for (size_t ibatch = 0; ibatch < ac.batch_count(); ibatch++) {
+        ARROW_CHECK_EQ(ac[ibatch].num_values(), 1);
+        ARROW_CHECK_EQ(ac[ibatch].length, ExecBatchBuilder::num_rows_max());
+        const uint64_t* vals =
+            reinterpret_cast<const 
uint64_t*>(ac[ibatch][0].array()->buffers[1]->data());
+        for (int64_t irow = 0; irow < ac[ibatch].length; irow++)
+          ARROW_CHECK_EQ(vals[irow] % 
SpillingAccumulationQueue::kNumPartitions, ipart);
+      }
+    }
+    return Status::OK();
+  });
+  ASSERT_FINISHES_OK(fut);
+}
+
+TEST(Spilling, SpillingAccumulationQueue_NoSpill) {
+  TestSpillingAccumulationQueue(SpillingTestParam::None);
+}
+
+TEST(Spilling, SpillingAccumulationQueue_SpillValues) {
+  TestSpillingAccumulationQueue(SpillingTestParam::Values);
+}
+
+TEST(Spilling, SpillingAccumulationQueue_SpillValuesAndHashes) {
+  TestSpillingAccumulationQueue(SpillingTestParam::ValuesAndHashes);
+}
+
+TEST(Spilling, ReadWriteBasicBatches) {
+  QueryContext ctx;
+  SpillFile file;
+  BatchesWithSchema batches = MakeBasicBatches();
+  std::vector<ExecBatch> read_batches(batches.batches.size());
+
+  Future<> fut = util::AsyncTaskScheduler::Make([&](util::AsyncTaskScheduler* 
sched) {
+    ARROW_CHECK_OK(ctx.Init(ctx.max_concurrency(), sched));
+    for (ExecBatch& b : batches.batches) {
+      ExecBatchBuilder builder;
+      std::vector<uint16_t> row_ids(b.length);
+      std::iota(row_ids.begin(), row_ids.end(), 0);
+      ARROW_CHECK_OK(builder.AppendSelected(ctx.memory_pool(), b,
+                                            static_cast<int>(b.length), 
row_ids.data(),
+                                            b.num_values()));
+      ARROW_CHECK_OK(file.SpillBatch(&ctx, builder.Flush()));
+    }
+
+    ARROW_CHECK_OK(file.ReadBackBatches(
+        &ctx,
+        [&read_batches](size_t, size_t batch_idx, ExecBatch batch) {
+          read_batches[batch_idx] = std::move(batch);
+          return Status::OK();
+        },
+        [&](size_t) {
+          AssertExecBatchesEqualIgnoringOrder(batches.schema, batches.batches,
+                                              read_batches);
+          return Status::OK();
+        }));
+    return Status::OK();
+  });
+  ASSERT_FINISHES_OK(fut);
+}
+
+TEST(Spilling, HashJoin) {
+  constexpr int kNumTests = 10;
+  Random64Bit rng(42);
+
+  // 50% chance to get a string column, 50% chance to get an integer
+  std::vector<std::shared_ptr<DataType>> possible_types = {
+      int8(), int16(), int32(), int64(), utf8(), utf8(), utf8(), utf8(),
+  };
+
+  std::unordered_map<std::string, std::string> key_metadata;
+  key_metadata["min"] = "0";
+  key_metadata["max"] = "1000";
+
+  for (int itest = 0; itest < kNumTests; itest++) {
+    int left_cols = rng.from_range(1, 4);
+    std::vector<std::shared_ptr<Field>> left_fields = {
+        field("l0", int32(), key_value_metadata(key_metadata))};
+    for (int i = 1; i < left_cols; i++) {
+      std::string name = std::string("l") + std::to_string(i);
+      size_t type = rng.from_range(static_cast<size_t>(0), 
possible_types.size() - 1);
+      left_fields.push_back(field(std::move(name), possible_types[type]));
+    }
+
+    int right_cols = rng.from_range(1, 4);
+    std::vector<std::shared_ptr<Field>> right_fields = {
+        field("r0", int32(), key_value_metadata(key_metadata))};
+    for (int i = 1; i < right_cols; i++) {
+      std::string name = std::string("r") + std::to_string(i);
+      size_t type = rng.from_range(static_cast<size_t>(0), 
possible_types.size() - 1);
+      right_fields.push_back(field(std::move(name), possible_types[type]));
+    }
+
+    std::vector<JoinKeyCmp> key_cmp = {JoinKeyCmp::EQ};
+    std::vector<FieldRef> left_keys = {FieldRef{0}};
+    std::vector<FieldRef> right_keys = {FieldRef{0}};
+
+    std::shared_ptr<Schema> l_schema = schema(std::move(left_fields));
+    std::shared_ptr<Schema> r_schema = schema(std::move(right_fields));
+
+    BatchesWithSchema l_batches = MakeRandomBatches(
+        l_schema, 10, 1024, kDefaultBufferAlignment, default_memory_pool());
+    BatchesWithSchema r_batches = MakeRandomBatches(
+        r_schema, 10, 1024, kDefaultBufferAlignment, default_memory_pool());
+
+    std::vector<ExecBatch> reference;
+    for (bool spilling : {false, true}) {
+      QueryOptions options;
+      if (spilling) options.max_memory_bytes = 1024;
+      ExecContext ctx(default_memory_pool(), 
::arrow::internal::GetCpuThreadPool());
+      ASSERT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, 
ExecPlan::Make(options, ctx));
+      ASSERT_OK_AND_ASSIGN(
+          ExecNode * l_source,
+          MakeExecNode(
+              "source", plan.get(), {},
+              SourceNodeOptions{l_batches.schema, 
l_batches.gen(/*parallel=*/true,
+                                                                
/*slow=*/false)}));
+      ASSERT_OK_AND_ASSIGN(
+          ExecNode * r_source,
+          MakeExecNode(
+              "source", plan.get(), {},
+              SourceNodeOptions{r_batches.schema, 
r_batches.gen(/*parallel=*/true,
+                                                                
/*slow=*/false)}));
+
+      HashJoinNodeOptions join_options;
+      join_options.left_keys = left_keys;
+      join_options.right_keys = right_keys;
+      join_options.output_all = true;
+      join_options.key_cmp = key_cmp;
+      ASSERT_OK_AND_ASSIGN(
+          ExecNode * join,
+          MakeExecNode("hashjoin", plan.get(), {l_source, r_source}, 
join_options));
+      AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+      ASSERT_OK(MakeExecNode("sink", plan.get(), {join}, 
SinkNodeOptions{&sink_gen}));
+      ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), 
sink_gen));

Review Comment:
   Can you use `DeclarationToExecBatches` instead of `StartAndCollect`?  I'm 
trying to migrate new test cases to use that as it is a bit more compact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to