This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8737123019 ARROW-16713: [C++] Pull join accumulation outside of
HashJoinImpl (#13332)
8737123019 is described below
commit 8737123019ecb49e1670ea7b94b02404dc3709b5
Author: Sasha Krassovsky <[email protected]>
AuthorDate: Thu Jun 16 18:27:01 2022 -0400
ARROW-16713: [C++] Pull join accumulation outside of HashJoinImpl (#13332)
Authored-by: Sasha Krassovsky <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/compute/exec/accumulation_queue.cc | 58 ++
cpp/src/arrow/compute/exec/accumulation_queue.h | 57 ++
cpp/src/arrow/compute/exec/hash_join.cc | 453 ++------------
cpp/src/arrow/compute/exec/hash_join.h | 27 +-
cpp/src/arrow/compute/exec/hash_join_benchmark.cc | 111 ++--
cpp/src/arrow/compute/exec/hash_join_node.cc | 691 ++++++++++++++++++----
7 files changed, 774 insertions(+), 624 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index ccf643bbc5..5d547b8eab 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -385,6 +385,7 @@ if(ARROW_COMPUTE)
compute/cast.cc
compute/exec.cc
compute/exec/aggregate.cc
+ compute/exec/accumulation_queue.cc
compute/exec/aggregate_node.cc
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc
b/cpp/src/arrow/compute/exec/accumulation_queue.cc
new file mode 100644
index 0000000000..192db52942
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/accumulation_queue.h"
+
+#include <iterator>
+
+namespace arrow {
+namespace util {
+using arrow::compute::ExecBatch;
+AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) {
+ this->batches_ = std::move(that.batches_);
+ this->row_count_ = that.row_count_;
+ that.Clear();
+}
+
+AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) {
+ this->batches_ = std::move(that.batches_);
+ this->row_count_ = that.row_count_;
+ that.Clear();
+ return *this;
+}
+
+void AccumulationQueue::Concatenate(AccumulationQueue&& that) {
+ this->batches_.reserve(this->batches_.size() + that.batches_.size());
+ std::move(that.batches_.begin(), that.batches_.end(),
+ std::back_inserter(this->batches_));
+ this->row_count_ += that.row_count_;
+ that.Clear();
+}
+
+void AccumulationQueue::InsertBatch(ExecBatch batch) {
+ row_count_ += batch.length;
+ batches_.emplace_back(std::move(batch));
+}
+
+void AccumulationQueue::Clear() {
+ row_count_ = 0;
+ batches_.clear();
+}
+
+ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }
+} // namespace util
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h
b/cpp/src/arrow/compute/exec/accumulation_queue.h
new file mode 100644
index 0000000000..4b23e5ffca
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/accumulation_queue.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/compute/exec.h"
+
+namespace arrow {
+namespace util {
+using arrow::compute::ExecBatch;
+
+/// \brief A container that accumulates batches until they are ready to
+/// be processed.
+class AccumulationQueue {
+ public:
+ AccumulationQueue() : row_count_(0) {}
+ ~AccumulationQueue() = default;
+
+ // We should never be copying ExecBatch around
+ AccumulationQueue(const AccumulationQueue&) = delete;
+ AccumulationQueue& operator=(const AccumulationQueue&) = delete;
+
+ AccumulationQueue(AccumulationQueue&& that);
+ AccumulationQueue& operator=(AccumulationQueue&& that);
+
+ void Concatenate(AccumulationQueue&& that);
+ void InsertBatch(ExecBatch batch);
+ int64_t row_count() { return row_count_; }
+ size_t batch_count() { return batches_.size(); }
+ bool empty() const { return batches_.empty(); }
+ void Clear();
+ ExecBatch& operator[](size_t i);
+
+ private:
+ int64_t row_count_;
+ std::vector<ExecBatch> batches_;
+};
+
+} // namespace util
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc
b/cpp/src/arrow/compute/exec/hash_join.cc
index 9c124674d7..1ebe11e704 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -40,70 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
struct ThreadLocalState;
public:
- HashJoinBasicImpl() : num_expected_bloom_filters_(0) {}
-
- Status InputReceived(size_t thread_index, int side, ExecBatch batch)
override {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
- EVENT(span_, "InputReceived");
-
- ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side,
batch));
- if (queued) {
- return Status::OK();
- } else {
- ARROW_DCHECK(side == 0);
- return ProbeBatch(thread_index, batch);
- }
- }
-
- Status InputFinished(size_t thread_index, int side) override {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
- EVENT(span_, "InputFinished", {{"side", side}});
- if (side == 0) {
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(finished_mutex_);
- proceed = !left_side_finished_ && left_queue_probe_finished_;
- left_side_finished_ = true;
- }
- if (proceed) {
- RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
- }
- } else {
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(finished_mutex_);
- proceed = !right_side_finished_;
- right_side_finished_ = true;
- }
- if (proceed) {
- RETURN_NOT_OK(OnRightSideFinished(thread_index));
- }
- }
- return Status::OK();
- }
-
- Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
- size_t /*num_threads*/, HashJoinSchema* schema_mgr,
- std::vector<JoinKeyCmp> key_cmp, Expression filter,
- OutputBatchCallback output_batch_callback,
- FinishedCallback finished_callback,
- TaskScheduler::ScheduleImpl schedule_task_callback,
- HashJoinImpl* pushdown_target, std::vector<int> column_map)
override {
- // TODO(ARROW-15732)
- // Each side of join might have an IO thread being called from.
- // As of right now, we ignore the `num_threads` argument, so later we will
have to
- // re-add `num_threads_ = num_threads;`
- num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity()
+ 1;
-
+ Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads,
+ HashJoinSchema* schema_mgr, std::vector<JoinKeyCmp> key_cmp,
+ Expression filter, OutputBatchCallback output_batch_callback,
+ FinishedCallback finished_callback, TaskScheduler* scheduler)
override {
START_COMPUTE_SPAN(span_, "HashJoinBasicImpl",
{{"detail", filter.ToString()},
{"join.kind", ToString(join_type)},
- {"join.threads",
static_cast<uint32_t>(num_threads_)}});
+ {"join.threads", static_cast<uint32_t>(num_threads)}});
+ num_threads_ = num_threads;
ctx_ = ctx;
join_type_ = join_type;
schema_mgr_ = schema_mgr;
@@ -111,6 +57,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
filter_ = std::move(filter);
output_batch_callback_ = std::move(output_batch_callback);
finished_callback_ = std::move(finished_callback);
+ scheduler_ = scheduler;
local_states_.resize(num_threads_);
for (size_t i = 0; i < local_states_.size(); ++i) {
local_states_[i].is_initialized = false;
@@ -119,39 +66,12 @@ class HashJoinBasicImpl : public HashJoinImpl {
dict_probe_.Init(num_threads_);
- pushdown_target_ = pushdown_target;
- column_map_ = std::move(column_map);
- if (pushdown_target_) pushdown_target_->ExpectBloomFilter();
-
- right_input_row_count_ = 0;
has_hash_table_ = false;
num_batches_produced_.store(0);
cancelled_ = false;
- right_side_finished_ = false;
- left_side_finished_ = false;
- bloom_filters_ready_ = false;
- left_queue_bloom_finished_ = false;
- left_queue_probe_finished_ = false;
-
- scheduler_ = TaskScheduler::Make();
- if (pushdown_target_) {
- bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
- bloom_filter_builder_ = BloomFilterBuilder::Make(
- use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
- : BloomFilterBuildStrategy::PARALLEL);
- }
- RegisterBuildBloomFilter();
RegisterBuildHashTable();
- RegisterBloomFilterQueuedBatches();
- RegisterProbeQueuedBatches();
RegisterScanHashTable();
- scheduler_->RegisterEnd();
-
- RETURN_NOT_OK(scheduler_->StartScheduling(
- 0 /*thread index*/, std::move(schedule_task_callback),
- static_cast<int>(2 * num_threads_) /*concurrent tasks*/,
use_sync_execution));
-
return Status::OK();
}
@@ -162,32 +82,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
scheduler_->Abort(std::move(pos_abort_callback));
}
- // Called by a downstream node after they have constructed a bloom filter
- // that this node can use to filter inputs.
- Status PushBloomFilter(size_t thread_index,
std::unique_ptr<BlockedBloomFilter> filter,
- std::vector<int> column_map) override {
- bool proceed;
- {
- std::lock_guard<std::mutex> lock_bloom(bloom_filters_mutex_);
- pushed_bloom_filters_.emplace_back(std::move(filter));
- bloom_filter_column_maps_.emplace_back(std::move(column_map));
- proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_;
- ARROW_DCHECK(pushed_bloom_filters_.size() <=
num_expected_bloom_filters_);
- }
- if (proceed) {
- size_t num_batches;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- num_batches = left_batches_.size();
- bloom_filters_ready_ = true;
- }
- RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches));
- }
- return Status::OK();
- }
-
- void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; }
-
private:
void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder*
encoder) {
std::vector<ValueDescr> data_types;
@@ -212,8 +106,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
if (has_payload) {
InitEncoder(0, HashJoinProjection::PAYLOAD,
&local_state.exec_batch_payloads);
}
- RETURN_NOT_OK(local_state.temp_stack.Init(
- ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength *
sizeof(uint32_t)));
local_state.is_initialized = true;
}
return Status::OK();
@@ -594,7 +486,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
}
}
- Status ProbeBatch(size_t thread_index, const ExecBatch& batch) {
+ Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) override {
ThreadLocalState& local_state = local_states_[thread_index];
RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
@@ -656,96 +548,18 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
- Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) {
- if (batch.length == 0) return Status::OK();
- int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
- std::vector<uint8_t> selected(bit_vector_bytes);
- std::vector<uint32_t> hashes(batch.length);
- std::vector<uint8_t> bv(bit_vector_bytes);
-
- RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
- // Start with full selection for the current batch
- memset(selected.data(), 0xff, bit_vector_bytes);
- for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++)
{
- std::vector<Datum> keys(bloom_filter_column_maps_[ifilter].size());
- for (size_t i = 0; i < keys.size(); i++) {
- int input_idx = bloom_filter_column_maps_[ifilter][i];
- keys[i] = batch[input_idx];
- if (keys[i].is_scalar()) {
- ARROW_ASSIGN_OR_RAISE(
- keys[i],
- MakeArrayFromScalar(*keys[i].scalar(), batch.length,
ctx_->memory_pool()));
- }
- }
- ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(keys)));
- RETURN_NOT_OK(Hashing32::HashBatch(
- key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(),
- &local_states_[thread_index].temp_stack, 0, key_batch.length));
-
- pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
- key_batch.length, hashes.data(),
bv.data());
- arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0,
key_batch.length, 0,
- selected.data());
- }
- auto selected_buffer =
- arrow::internal::make_unique<Buffer>(selected.data(),
bit_vector_bytes);
- ArrayData selected_arraydata(boolean(), batch.length,
- {nullptr, std::move(selected_buffer)});
- Datum selected_datum(selected_arraydata);
- FilterOptions options;
- size_t first_nonscalar = batch.values.size();
- for (size_t i = 0; i < batch.values.size(); i++) {
- if (!batch.values[i].is_scalar()) {
- ARROW_ASSIGN_OR_RAISE(batch.values[i],
- Filter(batch.values[i], selected_datum, options,
ctx_));
- first_nonscalar = std::min(first_nonscalar, i);
- ARROW_DCHECK_EQ(batch.values[i].length(),
batch.values[first_nonscalar].length());
- }
- }
- // If they're all Scalar, then the length of the batch is the number of
set bits
- if (first_nonscalar == batch.values.size())
- batch.length = arrow::internal::CountSetBits(selected.data(), 0,
batch.length);
- else
- batch.length = batch.values[first_nonscalar].length();
- return Status::OK();
- }
-
- int64_t BuildHashTable_num_tasks() { return 1; }
-
- Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) {
- const ExecBatch& input_batch = right_batches_[task_id];
- SchemaProjectionMap key_to_in =
- schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY,
HashJoinProjection::INPUT);
- std::vector<Datum> key_columns(key_to_in.num_cols);
- for (size_t i = 0; i < key_columns.size(); i++) {
- int input_idx = key_to_in.get(static_cast<int>(i));
- key_columns[i] = input_batch[input_idx];
- if (key_columns[i].is_scalar()) {
- ARROW_ASSIGN_OR_RAISE(
- key_columns[i], MakeArrayFromScalar(*key_columns[i].scalar(),
- input_batch.length,
ctx_->memory_pool()));
- }
- }
- ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(key_columns)));
-
- RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
- ThreadLocalState& tls = local_states_[thread_index];
- util::TempVectorHolder<uint32_t> hash_holder(&tls.temp_stack,
-
util::MiniBatch::kMiniBatchLength);
- uint32_t* hashes = hash_holder.mutable_data();
- for (int64_t i = 0; i < key_batch.length; i +=
util::MiniBatch::kMiniBatchLength) {
- int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
-
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
- RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes,
- ctx_->cpu_info()->hardware_flags(),
- &tls.temp_stack, i, length));
- RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length,
hashes));
- }
- return Status::OK();
+ void RegisterBuildHashTable() {
+ task_group_build_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) -> Status {
+ return BuildHashTable_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) -> Status {
+ return BuildHashTable_on_finished(thread_index);
+ });
}
Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
- const std::vector<ExecBatch>& batches = right_batches_;
+ AccumulationQueue batches = std::move(build_batches_);
if (batches.empty()) {
hash_table_empty_ = true;
} else {
@@ -756,7 +570,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
}
hash_table_empty_ = true;
- for (size_t ibatch = 0; ibatch < batches.size(); ++ibatch) {
+ for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
if (cancelled_) {
return Status::Cancelled("Hash join cancelled");
}
@@ -789,159 +603,30 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
- Status BuildBloomFilter_on_finished(size_t thread_index) {
- if (cancelled_) return Status::Cancelled("Hash join cancelled");
- ARROW_DCHECK(pushdown_target_);
- RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
- thread_index, std::move(bloom_filter_), std::move(column_map_)));
- return BuildHashTable(thread_index);
- }
-
Status BuildHashTable_on_finished(size_t thread_index) {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
-
- right_batches_.clear();
-
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- std::lock_guard<std::mutex> lock_finish(finished_mutex_);
- left_queue_bloom_finished_ =
- left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
- proceed = !has_hash_table_ && left_queue_bloom_finished_;
- has_hash_table_ = true;
- }
- if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
-
- return Status::OK();
- }
-
- void RegisterBuildBloomFilter() {
- task_group_bloom_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return BuildBloomFilter_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return BuildBloomFilter_on_finished(thread_index);
- });
- }
-
- void RegisterBuildHashTable() {
- task_group_build_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return BuildHashTable_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return BuildHashTable_on_finished(thread_index);
- });
+ ARROW_DCHECK_EQ(build_batches_.batch_count(), 0);
+ has_hash_table_ = true;
+ return build_finished_callback_(thread_index);
}
- Status BuildBloomFilter(size_t thread_index) {
- RETURN_NOT_OK(bloom_filter_builder_->Begin(
- num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(),
- right_input_row_count_, right_batches_.size(), bloom_filter_.get()));
-
- return scheduler_->StartTaskGroup(thread_index, task_group_bloom_,
- right_batches_.size());
- }
-
- Status BuildHashTable(size_t thread_index) {
+ Status BuildHashTable(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished) override {
+ build_finished_callback_ = std::move(on_finished);
+ build_batches_ = std::move(batches);
return scheduler_->StartTaskGroup(thread_index, task_group_build_,
- BuildHashTable_num_tasks());
- }
-
- Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t
task_id) {
- if (cancelled_) return Status::Cancelled("Hash join cancelled");
- ExecBatch batch;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- batch = std::move(left_batches_[task_id]);
- ARROW_DCHECK(!batch.values.empty());
- }
- RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- left_batches_[task_id] = std::move(batch);
- }
- return Status::OK();
- }
-
- Status BloomFilterQueuedBatches_on_finished(size_t thread_index) {
- if (cancelled_) return Status::Cancelled("Hash join cancelled");
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(finished_mutex_);
- proceed = !left_queue_bloom_finished_ && has_hash_table_;
- left_queue_bloom_finished_ = true;
- }
- if (proceed) return ProbeQueuedBatches(thread_index);
- return Status::OK();
- }
-
- void RegisterBloomFilterQueuedBatches() {
- task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return BloomFilterQueuedBatches_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return BloomFilterQueuedBatches_on_finished(thread_index);
- });
- }
-
- Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) {
- return scheduler_->StartTaskGroup(thread_index,
task_group_bloom_filter_queued_,
- num_batches);
- }
-
- int64_t ProbeQueuedBatches_num_tasks() {
- return static_cast<int64_t>(left_batches_.size());
- }
-
- Status ProbeQueuedBatches_exec_task(size_t thread_index, int64_t task_id) {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
- return ProbeBatch(thread_index, std::move(left_batches_[task_id]));
- }
-
- Status ProbeQueuedBatches_on_finished(size_t thread_index) {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
-
- left_batches_.clear();
-
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(finished_mutex_);
- ARROW_DCHECK(left_queue_bloom_finished_);
- proceed = left_side_finished_ && !left_queue_probe_finished_;
- left_queue_probe_finished_ = true;
- }
- if (proceed) {
- RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
- }
-
- return Status::OK();
+ /*num_tasks=*/1);
}
- void RegisterProbeQueuedBatches() {
- task_group_queued_ = scheduler_->RegisterTaskGroup(
+ void RegisterScanHashTable() {
+ task_group_scan_ = scheduler_->RegisterTaskGroup(
[this](size_t thread_index, int64_t task_id) -> Status {
- return ProbeQueuedBatches_exec_task(thread_index, task_id);
+ return ScanHashTable_exec_task(thread_index, task_id);
},
[this](size_t thread_index) -> Status {
- return ProbeQueuedBatches_on_finished(thread_index);
+ return ScanHashTable_on_finished(thread_index);
});
}
- Status ProbeQueuedBatches(size_t thread_index) {
- return scheduler_->StartTaskGroup(thread_index, task_group_queued_,
- ProbeQueuedBatches_num_tasks());
- }
-
int64_t ScanHashTable_num_tasks() {
if (!has_hash_table_ || hash_table_empty_) {
return 0;
@@ -1006,58 +691,13 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
- void RegisterScanHashTable() {
- task_group_scan_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return ScanHashTable_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return ScanHashTable_on_finished(thread_index);
- });
- }
-
Status ScanHashTable(size_t thread_index) {
MergeHasMatch();
return scheduler_->StartTaskGroup(thread_index, task_group_scan_,
ScanHashTable_num_tasks());
}
- Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch&
batch) {
- if (side == 0) {
- // We don't want to do the filtering while holding the lock, since that
can get
- // expensive.
- bool needs_filtering;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- bloom_filters_ready_ = bloom_filters_ready_ ||
num_expected_bloom_filters_ == 0;
- needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_
!= 0;
- }
- if (needs_filtering)
RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
-
- bool queued;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- queued = !bloom_filters_ready_ || !has_hash_table_;
- if (queued) left_batches_.emplace_back(std::move(batch));
- }
- return queued;
- } else {
- std::lock_guard<std::mutex> lock(right_batches_mutex_);
- right_input_row_count_ += batch.length;
- right_batches_.emplace_back(std::move(batch));
- return true;
- }
- }
-
- Status OnRightSideFinished(size_t thread_index) {
- if (pushdown_target_ == nullptr) {
- return BuildHashTable(thread_index);
- } else {
- return BuildBloomFilter(thread_index);
- }
- }
-
- Status OnLeftSideAndQueueFinished(size_t thread_index) {
+ Status ProbingFinished(size_t thread_index) override {
return ScanHashTable(thread_index);
}
@@ -1105,16 +745,14 @@ class HashJoinBasicImpl : public HashJoinImpl {
HashJoinSchema* schema_mgr_;
std::vector<JoinKeyCmp> key_cmp_;
Expression filter_;
- std::unique_ptr<TaskScheduler> scheduler_;
- int task_group_bloom_;
+ TaskScheduler* scheduler_;
int task_group_build_;
- int task_group_bloom_filter_queued_;
- int task_group_queued_;
int task_group_scan_;
// Callbacks
//
OutputBatchCallback output_batch_callback_;
+ BuildFinishedCallback build_finished_callback_;
FinishedCallback finished_callback_;
// Thread local runtime state
@@ -1129,7 +767,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
std::vector<int32_t> match_right;
bool is_has_match_initialized;
std::vector<uint8_t> has_match;
- util::TempVectorStack temp_stack;
};
std::vector<ThreadLocalState> local_states_;
@@ -1146,34 +783,12 @@ class HashJoinBasicImpl : public HashJoinImpl {
HashJoinDictBuildMulti dict_build_;
HashJoinDictProbeMulti dict_probe_;
- std::vector<ExecBatch> left_batches_;
bool has_hash_table_;
- std::mutex left_batches_mutex_;
-
- size_t right_input_row_count_; // Sum of the lengths of ExecBatches in
right_batches_
- std::vector<ExecBatch> right_batches_;
- std::mutex right_batches_mutex_;
- // Bloom filter stuff
- //
- std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_;
- std::unique_ptr<BlockedBloomFilter> bloom_filter_;
- std::vector<int> column_map_;
- std::vector<std::unique_ptr<BlockedBloomFilter>> pushed_bloom_filters_;
- std::vector<std::vector<int>> bloom_filter_column_maps_;
- std::mutex bloom_filters_mutex_;
- size_t num_expected_bloom_filters_;
- HashJoinImpl* pushdown_target_;
+ AccumulationQueue build_batches_;
std::atomic<int64_t> num_batches_produced_;
bool cancelled_;
-
- bool bloom_filters_ready_;
- bool right_side_finished_;
- bool left_side_finished_;
- bool left_queue_bloom_finished_;
- bool left_queue_probe_finished_;
- std::mutex finished_mutex_;
};
Result<std::unique_ptr<HashJoinImpl>> HashJoinImpl::MakeBasic() {
diff --git a/cpp/src/arrow/compute/exec/hash_join.h
b/cpp/src/arrow/compute/exec/hash_join.h
index 2b3bef0ead..97bdf166a0 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -21,6 +21,7 @@
#include <memory>
#include <vector>
+#include "arrow/compute/exec/accumulation_queue.h"
#include "arrow/compute/exec/bloom_filter.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
@@ -33,6 +34,8 @@
namespace arrow {
namespace compute {
+using arrow::util::AccumulationQueue;
+
class ARROW_EXPORT HashJoinSchema {
public:
Status Init(JoinType join_type, const Schema& left_schema,
@@ -100,22 +103,20 @@ class ARROW_EXPORT HashJoinSchema {
class HashJoinImpl {
public:
using OutputBatchCallback = std::function<void(ExecBatch)>;
+ using BuildFinishedCallback = std::function<Status(size_t)>;
+ using ProbeFinishedCallback = std::function<Status(size_t)>;
using FinishedCallback = std::function<void(int64_t)>;
virtual ~HashJoinImpl() = default;
- virtual Status Init(ExecContext* ctx, JoinType join_type, bool
use_sync_execution,
- size_t num_threads, HashJoinSchema* schema_mgr,
- std::vector<JoinKeyCmp> key_cmp, Expression filter,
- OutputBatchCallback output_batch_callback,
- FinishedCallback finished_callback,
- TaskScheduler::ScheduleImpl schedule_task_callback,
- HashJoinImpl* pushdown_target, std::vector<int>
column_map) = 0;
- virtual void ExpectBloomFilter() = 0;
- virtual Status PushBloomFilter(size_t thread_index,
- std::unique_ptr<BlockedBloomFilter> filter,
- std::vector<int> column_map) = 0;
- virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch)
= 0;
- virtual Status InputFinished(size_t thread_index, int side) = 0;
+ virtual Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads,
+ HashJoinSchema* schema_mgr, std::vector<JoinKeyCmp>
key_cmp,
+ Expression filter, OutputBatchCallback
output_batch_callback,
+ FinishedCallback finished_callback, TaskScheduler*
scheduler) = 0;
+
+ virtual Status BuildHashTable(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished) = 0;
+ virtual Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) = 0;
+ virtual Status ProbingFinished(size_t thread_index) = 0;
virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback)
= 0;
static Result<std::unique_ptr<HashJoinImpl>> MakeBasic();
diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
index 8d8be7f904..0786071f99 100644
--- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
@@ -36,7 +36,6 @@
namespace arrow {
namespace compute {
struct BenchmarkSettings {
- bool bloom_filter = false;
int num_threads = 1;
JoinType join_type = JoinType::INNER;
int batch_size = 1024;
@@ -111,11 +110,16 @@ class JoinBenchmark {
auto l_schema = *l_schema_builder.Finish();
auto r_schema = *r_schema_builder.Finish();
- l_batches_ =
+ BatchesWithSchema l_batches_with_schema =
MakeRandomBatches(l_schema, settings.num_probe_batches,
settings.batch_size);
- r_batches_ =
+ BatchesWithSchema r_batches_with_schema =
MakeRandomBatches(r_schema, settings.num_build_batches,
settings.batch_size);
+ for (ExecBatch& batch : l_batches_with_schema.batches)
+ l_batches_.InsertBatch(std::move(batch));
+ for (ExecBatch& batch : r_batches_with_schema.batches)
+ r_batches_.InsertBatch(std::move(batch));
+
stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size;
ctx_ = arrow::internal::make_unique<ExecContext>(
@@ -124,27 +128,12 @@ class JoinBenchmark {
schema_mgr_ = arrow::internal::make_unique<HashJoinSchema>();
Expression filter = literal(true);
- DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_.schema,
left_keys,
- *r_batches_.schema, right_keys, filter, "l_",
"r_"));
+ DCHECK_OK(schema_mgr_->Init(settings.join_type,
*l_batches_with_schema.schema,
+ left_keys, *r_batches_with_schema.schema,
right_keys,
+ filter, "l_", "r_"));
join_ = *HashJoinImpl::MakeBasic();
- HashJoinImpl* bloom_filter_pushdown_target = nullptr;
- std::vector<int> key_input_map;
-
- bool bloom_filter_does_not_apply_to_join =
- settings.join_type == JoinType::LEFT_ANTI ||
- settings.join_type == JoinType::LEFT_OUTER ||
- settings.join_type == JoinType::FULL_OUTER;
- if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) {
- bloom_filter_pushdown_target = join_.get();
- SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map(
- HashJoinProjection::KEY, HashJoinProjection::INPUT);
- int num_keys = probe_key_to_input.num_cols;
- for (int i = 0; i < num_keys; i++)
- key_input_map.push_back(probe_key_to_input.get(i));
- }
-
omp_set_num_threads(settings.num_threads);
auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
#pragma omp task
@@ -152,39 +141,47 @@ class JoinBenchmark {
return Status::OK();
};
+ scheduler_ = TaskScheduler::Make();
DCHECK_OK(join_->Init(
- ctx_.get(), settings.join_type, !is_parallel, settings.num_threads,
- schema_mgr_.get(), std::move(key_cmp), std::move(filter),
[](ExecBatch) {},
- [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target,
- std::move(key_input_map)));
+ ctx_.get(), settings.join_type, settings.num_threads,
schema_mgr_.get(),
+ std::move(key_cmp), std::move(filter), [](ExecBatch) {}, [](int64_t x)
{},
+ scheduler_.get()));
+
+ task_group_probe_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) -> Status {
+ return join_->ProbeSingleBatch(thread_index,
std::move(l_batches_[task_id]));
+ },
+ [this](size_t thread_index) -> Status {
+ return join_->ProbingFinished(thread_index);
+ });
+
+ scheduler_->RegisterEnd();
+
+ DCHECK_OK(scheduler_->StartScheduling(
+ 0 /*thread index*/, std::move(schedule_callback),
+ static_cast<int>(2 * settings.num_threads) /*concurrent tasks*/,
!is_parallel));
}
void RunJoin() {
#pragma omp parallel
{
int tid = omp_get_thread_num();
-#pragma omp for nowait
- for (auto it = r_batches_.batches.begin(); it !=
r_batches_.batches.end(); ++it)
- DCHECK_OK(join_->InputReceived(tid, /*side=*/1, *it));
-#pragma omp for nowait
- for (auto it = l_batches_.batches.begin(); it !=
l_batches_.batches.end(); ++it)
- DCHECK_OK(join_->InputReceived(tid, /*side=*/0, *it));
-
-#pragma omp barrier
-
-#pragma omp single nowait
- { DCHECK_OK(join_->InputFinished(tid, /*side=*/1)); }
-
-#pragma omp single nowait
- { DCHECK_OK(join_->InputFinished(tid, /*side=*/0)); }
+#pragma omp single
+ DCHECK_OK(
+ join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t
thread_index) {
+ return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+ l_batches_.batch_count());
+ }));
}
}
- BatchesWithSchema l_batches_;
- BatchesWithSchema r_batches_;
+ std::unique_ptr<TaskScheduler> scheduler_;
+ AccumulationQueue l_batches_;
+ AccumulationQueue r_batches_;
std::unique_ptr<HashJoinSchema> schema_mgr_;
std::unique_ptr<HashJoinImpl> join_;
std::unique_ptr<ExecContext> ctx_;
+ int task_group_probe_;
struct {
uint64_t num_probe_rows;
@@ -193,11 +190,17 @@ class JoinBenchmark {
static void HashJoinBasicBenchmarkImpl(benchmark::State& st,
BenchmarkSettings& settings) {
- JoinBenchmark bm(settings);
uint64_t total_rows = 0;
for (auto _ : st) {
- bm.RunJoin();
- total_rows += bm.stats_.num_probe_rows;
+ st.PauseTiming();
+ {
+ JoinBenchmark bm(settings);
+ st.ResumeTiming();
+ bm.RunJoin();
+ st.PauseTiming();
+ total_rows += bm.stats_.num_probe_rows;
+ }
+ st.ResumeTiming();
}
st.counters["rows/sec"] = benchmark::Counter(total_rows,
benchmark::Counter::kIsRate);
}
@@ -288,16 +291,6 @@ static void
BM_HashJoinBasic_NullPercentage(benchmark::State& st) {
HashJoinBasicBenchmarkImpl(st, settings);
}
-
-static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool
bloom_filter) {
- BenchmarkSettings settings;
- settings.bloom_filter = bloom_filter;
- settings.selectivity = static_cast<double>(st.range(0)) / 100.0;
- settings.num_build_batches = static_cast<int>(st.range(1));
- settings.num_probe_batches = settings.num_build_batches;
-
- HashJoinBasicBenchmarkImpl(st, settings);
-}
#endif
std::vector<int64_t> hashtable_krows = benchmark::CreateRange(1, 4096, 8);
@@ -425,16 +418,6 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism)
BENCHMARK(BM_HashJoinBasic_NullPercentage)
->ArgNames({"Null Percentage"})
->DenseRange(0, 100, 10);
-
-std::vector<std::string> bloomfilter_argnames = {"Selectivity", "HashTable
krows"};
-std::vector<std::vector<int64_t>> bloomfilter_args = {
- benchmark::CreateDenseRange(0, 100, 10), hashtable_krows};
-BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true)
- ->ArgNames(bloomfilter_argnames)
- ->ArgsProduct(selectivity_args);
-BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false)
- ->ArgNames(bloomfilter_argnames)
- ->ArgsProduct(selectivity_args);
#else
BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc
b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 60ad75228a..baa8267125 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+#include <mutex>
#include <unordered_set>
#include <utility>
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/hash_join.h"
#include "arrow/compute/exec/hash_join_dict.h"
+#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
#include "arrow/compute/exec/util.h"
@@ -470,6 +472,189 @@ Status ValidateHashJoinNodeOptions(const
HashJoinNodeOptions& join_options) {
return Status::OK();
}
+class HashJoinNode;
+
+// This is a struct encapsulating things related to Bloom filters and pushing
them around
+// between HashJoinNodes. The general strategy is to notify other joins at
plan-creation
+// time for that join to expect a Bloom filter. Once the full build side has
been
+// accumulated for a given join, it will build the Bloom filter and push it to
its
+// pushdown target. Once a join has received all of its Bloom filters, it will
evaluate it
+// on every batch that has been queued so far as well as any new probe-side
batch that
+// comes in.
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ // Builds the Bloom filter, taking ownership of the batches until the build
+ // is done.
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ // Sends the Bloom filter to the pushdown target.
+ Status PushBloomFilter();
+
+ // Receives a Bloom filter and its associated column map.
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK_EQ(eval_.received_filters_.size(),
eval_.received_maps_.size());
+ ARROW_DCHECK_LE(eval_.received_filters_.size(),
eval_.num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ return eval_.all_received_callback_();
+ }
+ return Status::OK();
+ }
+
+ // Evaluates the Bloom filter on a group of batches, taking ownership of them
+ // until the whole filtering process is complete.
+ Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+ FilterFinishedCallback on_finished) {
+ eval_.batches_ = std::move(batches);
+ eval_.on_finished_ = std::move(on_finished);
+
+ if (eval_.num_expected_bloom_filters_ == 0)
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+
+ return scheduler_->StartTaskGroup(thread_index, eval_.task_id_,
+
/*num_tasks=*/eval_.batches_.batch_count());
+ }
+
+ // Applies all Bloom filters on the input batch.
+ Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) {
+ ExecBatch& batch = *batch_ptr;
+ if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return
Status::OK();
+
+ int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+ std::vector<uint8_t> selected(bit_vector_bytes);
+ std::vector<uint32_t> hashes(batch.length);
+ std::vector<uint8_t> bv(bit_vector_bytes);
+
+ ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack,
GetStack(thread_index));
+
+ // Start with full selection for the current batch
+ memset(selected.data(), 0xff, bit_vector_bytes);
+ for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_;
ifilter++) {
+ std::vector<Datum> keys(eval_.received_maps_[ifilter].size());
+ for (size_t i = 0; i < keys.size(); i++) {
+ int input_idx = eval_.received_maps_[ifilter][i];
+ keys[i] = batch[input_idx];
+ if (keys[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ keys[i],
+ MakeArrayFromScalar(*keys[i].scalar(), batch.length,
ctx_->memory_pool()));
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(keys)));
+ RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(),
+ ctx_->cpu_info()->hardware_flags(),
stack, 0,
+ key_batch.length));
+
+
eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+ key_batch.length, hashes.data(),
bv.data());
+ arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0,
key_batch.length, 0,
+ selected.data());
+ }
+ auto selected_buffer =
+ arrow::internal::make_unique<Buffer>(selected.data(),
bit_vector_bytes);
+ ArrayData selected_arraydata(boolean(), batch.length,
+ {nullptr, std::move(selected_buffer)});
+ Datum selected_datum(selected_arraydata);
+ FilterOptions options;
+ size_t first_nonscalar = batch.values.size();
+ for (size_t i = 0; i < batch.values.size(); i++) {
+ if (!batch.values[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(batch.values[i],
+ Filter(batch.values[i], selected_datum, options,
ctx_));
+ first_nonscalar = std::min(first_nonscalar, i);
+ ARROW_DCHECK_EQ(batch.values[i].length(),
batch.values[first_nonscalar].length());
+ }
+ }
+ // If they're all Scalar, then the length of the batch is the number of
set bits
+ if (first_nonscalar == batch.values.size())
+ batch.length = arrow::internal::CountSetBits(selected.data(), 0,
batch.length);
+ else
+ batch.length = batch.values[first_nonscalar].length();
+ return Status::OK();
+ }
+
+ private:
+ Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id);
+
+ Status BuildBloomFilter_on_finished(size_t thread_index) {
+ return build_.on_finished_(thread_index, std::move(build_.batches_));
+ }
+
+ // The Bloom filter is built on the build side of some upstream join. For a
join to
+ // evaluate the Bloom filter on its input columns, it has to rearrange its
input columns
+ // to match the column order of the Bloom filter.
+ //
+ // The first part of the pair is the HashJoin to actually perform the
pushdown into.
+ // The second part is a mapping such that column_map[i] is the index of key
i in
+ // the first part's input.
+ // If we should disable Bloom filter, returns nullptr and an empty vector,
and sets
+ // the disable_bloom_filter_ flag.
+ std::pair<HashJoinNode*, std::vector<int>> GetPushdownTarget(HashJoinNode*
start);
+
+ Result<util::TempVectorStack*> GetStack(size_t thread_index) {
+ if (!tld_[thread_index].is_init) {
+ RETURN_NOT_OK(tld_[thread_index].stack.Init(
+ ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength *
sizeof(uint32_t)));
+ tld_[thread_index].is_init = true;
+ }
+ return &tld_[thread_index].stack;
+ }
+
+ bool disable_bloom_filter_;
+ HashJoinSchema* schema_mgr_;
+ ExecContext* ctx_;
+ TaskScheduler* scheduler_;
+
+ struct ThreadLocalData {
+ bool is_init = false;
+ util::TempVectorStack stack;
+ };
+ std::vector<ThreadLocalData> tld_;
+
+ struct {
+ int task_id_;
+ std::unique_ptr<BloomFilterBuilder> builder_;
+ AccumulationQueue batches_;
+ BuildFinishedCallback on_finished_;
+ } build_;
+
+ struct {
+ std::unique_ptr<BlockedBloomFilter> bloom_filter_;
+ HashJoinNode* pushdown_target_;
+ std::vector<int> column_map_;
+ } push_;
+
+ struct {
+ int task_id_;
+ size_t num_expected_bloom_filters_ = 0;
+ std::mutex receive_mutex_;
+ std::vector<std::unique_ptr<BlockedBloomFilter>> received_filters_;
+ std::vector<std::vector<int>> received_maps_;
+ AccumulationQueue batches_;
+ FiltersReceivedCallback all_received_callback_;
+ FilterFinishedCallback on_finished_;
+ } eval_;
+};
+
class HashJoinNode : public ExecNode {
public:
HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions&
join_options,
@@ -533,6 +718,120 @@ class HashJoinNode : public ExecNode {
const char* kind_name() const override { return "HashJoinNode"; }
+ Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ std::lock_guard<std::mutex> guard(build_side_mutex_);
+ build_accumulator_.InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+
+ Status OnBuildSideFinished(size_t thread_index) {
+ return pushdown_context_.BuildBloomFilter(
+ thread_index, std::move(build_accumulator_),
+ [this](size_t thread_index, AccumulationQueue batches) {
+ return OnBloomFilterFinished(thread_index, std::move(batches));
+ });
+ }
+
+ Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches)
{
+ RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+ return impl_->BuildHashTable(
+ thread_index, std::move(batches),
+ [this](size_t thread_index) { return
OnHashTableFinished(thread_index); });
+ }
+
+ Status OnHashTableFinished(size_t thread_index) {
+ bool should_probe;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_probe = queued_batches_filtered_ && !hash_table_ready_;
+ hash_table_ready_ = true;
+ }
+ if (should_probe) {
+ return ProbeQueuedBatches(thread_index);
+ }
+ return Status::OK();
+ }
+
+ Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ if (!bloom_filters_ready_) {
+ probe_accumulator_.InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+ }
+ RETURN_NOT_OK(pushdown_context_.FilterSingleBatch(thread_index, &batch));
+
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ if (!hash_table_ready_) {
+ probe_accumulator_.InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+ }
+ RETURN_NOT_OK(impl_->ProbeSingleBatch(thread_index, std::move(batch)));
+ return Status::OK();
+ }
+
+ Status OnProbeSideFinished(size_t thread_index) {
+ bool probing_finished;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ probing_finished = queued_batches_probed_ && !probe_side_finished_;
+ probe_side_finished_ = true;
+ }
+ if (probing_finished) return impl_->ProbingFinished(thread_index);
+ return Status::OK();
+ }
+
+ Status OnFiltersReceived() {
+ std::unique_lock<std::mutex> guard(probe_side_mutex_);
+ bloom_filters_ready_ = true;
+ size_t thread_index = thread_indexer_();
+ AccumulationQueue batches = std::move(probe_accumulator_);
+ guard.unlock();
+ return pushdown_context_.FilterBatches(
+ thread_index, std::move(batches),
+ [this](size_t thread_index, AccumulationQueue batches) {
+ return OnQueuedBatchesFiltered(thread_index, std::move(batches));
+ });
+ }
+
+ Status OnQueuedBatchesFiltered(size_t thread_index, AccumulationQueue
batches) {
+ bool should_probe;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ probe_accumulator_.Concatenate(std::move(batches));
+ should_probe = !queued_batches_filtered_ && hash_table_ready_;
+ queued_batches_filtered_ = true;
+ }
+ if (should_probe) {
+ return ProbeQueuedBatches(thread_index);
+ }
+ return Status::OK();
+ }
+
+ Status ProbeQueuedBatches(size_t thread_index) {
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ queued_batches_to_probe_ = std::move(probe_accumulator_);
+ }
+ return scheduler_->StartTaskGroup(thread_index, task_group_probe_,
+ queued_batches_to_probe_.batch_count());
+ }
+
+ Status OnQueuedBatchesProbed(size_t thread_index) {
+ queued_batches_to_probe_.Clear();
+ bool probing_finished;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ probing_finished = !queued_batches_probed_ && probe_side_finished_;
+ queued_batches_probed_ = true;
+ }
+ if (probing_finished) return impl_->ProbingFinished(thread_index);
+ return Status::OK();
+ }
+
void InputReceived(ExecNode* input, ExecBatch batch) override {
ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) !=
inputs_.end());
if (complete_.load()) {
@@ -547,16 +846,19 @@ class HashJoinNode : public ExecNode {
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"batch.length", batch.length}});
- {
- Status status = impl_->InputReceived(thread_index, side,
std::move(batch));
- if (!status.ok()) {
- StopProducing();
- ErrorIfNotOk(status);
- return;
- }
+ Status status = side == 0 ? OnProbeSideBatch(thread_index,
std::move(batch))
+ : OnBuildSideBatch(thread_index,
std::move(batch));
+
+ if (!status.ok()) {
+ StopProducing();
+ ErrorIfNotOk(status);
+ return;
}
+
if (batch_count_[side].Increment()) {
- Status status = impl_->InputFinished(thread_index, side);
+ status = side == 0 ? OnProbeSideFinished(thread_index)
+ : OnBuildSideFinished(thread_index);
+
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
@@ -581,7 +883,9 @@ class HashJoinNode : public ExecNode {
EVENT(span_, "InputFinished", {{"side", side}, {"batches.length",
total_batches}});
if (batch_count_[side].SetTotal(total_batches)) {
- Status status = impl_->InputFinished(thread_index, side);
+ Status status = side == 0 ? OnProbeSideFinished(thread_index)
+ : OnBuildSideFinished(thread_index);
+
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
@@ -590,131 +894,42 @@ class HashJoinNode : public ExecNode {
}
}
- // The Bloom filter is built on the build side of some upstream join. For a
join to
- // evaluate the Bloom filter on its input columns, it has to rearrange its
input columns
- // to match the column order of the Bloom filter.
- //
- // The first part of the pair is the HashJoin to actually perform the
pushdown into.
- // The second part is a mapping such that column_map[i] is the index of key
i in
- // the first part's input.
- // If we should disable Bloom filter, returns nullptr and an empty vector,
and sets
- // the disable_bloom_filter_ flag.
- std::pair<HashJoinImpl*, std::vector<int>> GetPushdownTarget() {
-#if !ARROW_LITTLE_ENDIAN
- // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It
probably just
- // needs a few byte swaps in the proper spots.
- disable_bloom_filter_ = true;
- return {nullptr, {}};
-#else
- // A build-side Bloom filter tells us if a row is definitely not in the
build side.
- // This allows us to early-eliminate rows or early-accept rows depending
on the type
- // of join. Left Outer Join and Full Outer Join output all rows, so a
build-side Bloom
- // filter would only allow us to early-output. Left Antijoin outputs only
if there is
- // no match, so again early output. We don't implement early output for
now, so we
- // must disallow these types of joins.
- bool bloom_filter_does_not_apply_to_join = join_type_ ==
JoinType::LEFT_ANTI ||
- join_type_ ==
JoinType::LEFT_OUTER ||
- join_type_ ==
JoinType::FULL_OUTER;
- disable_bloom_filter_ = disable_bloom_filter_ ||
bloom_filter_does_not_apply_to_join;
-
- for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
- SchemaProjectionMap keys_to_input = schema_mgr_->proj_maps[side].map(
- HashJoinProjection::KEY, HashJoinProjection::INPUT);
- // Bloom filter currently doesn't support dictionaries.
- for (int i = 0; i < keys_to_input.num_cols; i++) {
- int idx = keys_to_input.get(i);
- bool is_dict =
- inputs_[side]->output_schema()->field(idx)->type()->id() ==
Type::DICTIONARY;
- if (is_dict) {
- disable_bloom_filter_ = true;
- break;
- }
- }
- }
-
- bool all_comparisons_is = true;
- for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp ==
JoinKeyCmp::IS);
-
- if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ ==
JoinType::FULL_OUTER) &&
- all_comparisons_is)
- disable_bloom_filter_ = true;
-
- if (disable_bloom_filter_) return {nullptr, {}};
-
- // We currently only push Bloom filters on the probe side, and only if
that input is
- // also a join.
- SchemaProjectionMap probe_key_to_input =
- schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY,
HashJoinProjection::INPUT);
- int num_keys = probe_key_to_input.num_cols;
-
- // A mapping such that bloom_to_target[i] is the index of key i in the
pushdown
- // target's input
- std::vector<int> bloom_to_target(num_keys);
- HashJoinNode* pushdown_target = this;
- for (int i = 0; i < num_keys; i++) bloom_to_target[i] =
probe_key_to_input.get(i);
-
- for (ExecNode* candidate = inputs()[0]; candidate->kind_name() ==
this->kind_name();
- candidate = candidate->inputs()[0]) {
- auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
- SchemaProjectionMap candidate_output_to_input =
-
candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
-
HashJoinProjection::INPUT);
-
- // Check if any of the keys are missing, if they are, break
- bool break_outer = false;
- for (int i = 0; i < num_keys; i++) {
- // Since all of the probe side columns are before the build side
columns,
- // if the index of an output is greater than the number of probe-side
input
- // columns, it must have come from the candidate's build side.
- if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
- break_outer = true;
- break;
- }
- int candidate_input_idx =
candidate_output_to_input.get(bloom_to_target[i]);
- // The output column has to have come from somewhere...
- ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField());
- }
- if (break_outer) break;
-
- // The Bloom filter will filter out nulls, which may cause a Right/Full
Outer Join
- // to incorrectly output some rows with nulls padding the probe-side
rows. This may
- // cause a row with all null keys to be emitted. This is normally not an
issue
- // with EQ, but if all comparisons are IS (i.e. all-null is accepted),
this could
- // produce incorrect rows.
- bool can_produce_build_side_nulls =
- candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
- candidate_as_join->join_type_ == JoinType::FULL_OUTER;
-
- if (all_comparisons_is || can_produce_build_side_nulls) break;
-
- // All keys are present, we can update the mapping
- for (int i = 0; i < num_keys; i++) {
- int candidate_input_idx =
candidate_output_to_input.get(bloom_to_target[i]);
- bloom_to_target[i] = candidate_input_idx;
- }
- pushdown_target = candidate_as_join;
- }
- return std::make_pair(pushdown_target->impl_.get(),
std::move(bloom_to_target));
-#endif // ARROW_LITTLE_ENDIAN
- }
-
Status PrepareToProduce() override {
bool use_sync_execution = !(plan_->exec_context()->executor());
- size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity();
+ // TODO(ARROW-15732)
+ // Each side of join might have an IO thread being called from. Once this
is fixed
+ // we will change it back to just the CPU's thread pool capacity.
+ size_t num_threads = (GetCpuThreadPoolCapacity() +
io::GetIOThreadPoolCapacity() + 1);
+
+ scheduler_ = TaskScheduler::Make();
+ pushdown_context_.Init(
+ this, num_threads, scheduler_.get(), [this]() { return
OnFiltersReceived(); },
+ disable_bloom_filter_, use_sync_execution);
+
+ RETURN_NOT_OK(impl_->Init(
+ plan_->exec_context(), join_type_, num_threads, schema_mgr_.get(),
key_cmp_,
+ filter_, [this](ExecBatch batch) { this->OutputBatchCallback(batch); },
+ [this](int64_t total_num_batches) {
this->FinishedCallback(total_num_batches); },
+ scheduler_.get()));
- HashJoinImpl* pushdown_target = nullptr;
- std::vector<int> column_map;
- std::tie(pushdown_target, column_map) = GetPushdownTarget();
+ task_group_probe_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) -> Status {
+ return impl_->ProbeSingleBatch(thread_index,
+
std::move(queued_batches_to_probe_[task_id]));
+ },
+ [this](size_t thread_index) -> Status {
+ return OnQueuedBatchesProbed(thread_index);
+ });
- return impl_->Init(
- plan_->exec_context(), join_type_, use_sync_execution, num_threads,
- schema_mgr_.get(), key_cmp_, filter_,
- [this](ExecBatch batch) { this->OutputBatchCallback(batch); },
- [this](int64_t total_num_batches) {
this->FinishedCallback(total_num_batches); },
+ scheduler_->RegisterEnd();
+
+ RETURN_NOT_OK(scheduler_->StartScheduling(
+ 0 /*thread index*/,
[this](std::function<Status(size_t)> func) -> Status {
return this->ScheduleTaskCallback(std::move(func));
},
- pushdown_target, std::move(column_map));
+ static_cast<int>(2 * num_threads) /*concurrent tasks*/,
use_sync_execution));
+ return Status::OK();
}
Status StartProducing() override {
@@ -723,7 +938,7 @@ class HashJoinNode : public ExecNode {
{"node.detail", ToString()},
{"node.kind", kind_name()}});
END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
-
+ RETURN_NOT_OK(pushdown_context_.StartProducing());
return Status::OK();
}
@@ -797,9 +1012,229 @@ class HashJoinNode : public ExecNode {
std::unique_ptr<HashJoinSchema> schema_mgr_;
std::unique_ptr<HashJoinImpl> impl_;
util::AsyncTaskGroup task_group_;
+ std::unique_ptr<TaskScheduler> scheduler_;
+ util::AccumulationQueue build_accumulator_;
+ util::AccumulationQueue probe_accumulator_;
+ util::AccumulationQueue queued_batches_to_probe_;
+
+ std::mutex build_side_mutex_;
+ std::mutex probe_side_mutex_;
+
+ int task_group_probe_;
+ bool bloom_filters_ready_ = false;
+ bool hash_table_ready_ = false;
+ bool queued_batches_filtered_ = false;
+ bool queued_batches_probed_ = false;
+ bool probe_side_finished_ = false;
+
+ friend struct BloomFilterPushdownContext;
bool disable_bloom_filter_;
+ BloomFilterPushdownContext pushdown_context_;
};
+void BloomFilterPushdownContext::Init(HashJoinNode* owner, size_t num_threads,
+ TaskScheduler* scheduler,
+ FiltersReceivedCallback
on_bloom_filters_received,
+ bool disable_bloom_filter,
+ bool use_sync_execution) {
+ schema_mgr_ = owner->schema_mgr_.get();
+ ctx_ = owner->plan_->exec_context();
+ scheduler_ = scheduler;
+ tld_.resize(num_threads);
+ disable_bloom_filter_ = disable_bloom_filter;
+ std::tie(push_.pushdown_target_, push_.column_map_) =
GetPushdownTarget(owner);
+ eval_.all_received_callback_ = std::move(on_bloom_filters_received);
+ if (!disable_bloom_filter_) {
+ ARROW_CHECK(push_.pushdown_target_);
+ push_.bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+ push_.pushdown_target_->pushdown_context_.ExpectBloomFilter();
+
+ build_.builder_ = BloomFilterBuilder::Make(
+ use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+ : BloomFilterBuildStrategy::PARALLEL);
+
+ build_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return BuildBloomFilter_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) {
+ return BuildBloomFilter_on_finished(thread_index);
+ });
+ }
+
+ eval_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return FilterSingleBatch(thread_index, &eval_.batches_[task_id]);
+ },
+ [this](size_t thread_index) {
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+ });
+}
+
+Status BloomFilterPushdownContext::StartProducing() {
+ if (eval_.num_expected_bloom_filters_ == 0) return
eval_.all_received_callback_();
+ return Status::OK();
+}
+
+Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index,
+ AccumulationQueue batches,
+ BuildFinishedCallback
on_finished) {
+ build_.batches_ = std::move(batches);
+ build_.on_finished_ = std::move(on_finished);
+
+ if (disable_bloom_filter_)
+ return build_.on_finished_(thread_index, std::move(build_.batches_));
+
+ RETURN_NOT_OK(build_.builder_->Begin(
+ /*num_threads=*/tld_.size(), ctx_->cpu_info()->hardware_flags(),
+ ctx_->memory_pool(), build_.batches_.row_count(),
build_.batches_.batch_count(),
+ push_.bloom_filter_.get()));
+
+ return scheduler_->StartTaskGroup(thread_index, build_.task_id_,
+
/*num_tasks=*/build_.batches_.batch_count());
+}
+
+Status BloomFilterPushdownContext::PushBloomFilter() {
+ if (!disable_bloom_filter_)
+ return push_.pushdown_target_->pushdown_context_.ReceiveBloomFilter(
+ std::move(push_.bloom_filter_), std::move(push_.column_map_));
+ return Status::OK();
+}
+
+Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t
thread_index,
+ int64_t task_id)
{
+ const ExecBatch& input_batch = build_.batches_[task_id];
+ SchemaProjectionMap key_to_in =
+ schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY,
HashJoinProjection::INPUT);
+ std::vector<Datum> key_columns(key_to_in.num_cols);
+ for (size_t i = 0; i < key_columns.size(); i++) {
+ int input_idx = key_to_in.get(static_cast<int>(i));
+ key_columns[i] = input_batch[input_idx];
+ if (key_columns[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(key_columns[i],
+ MakeArrayFromScalar(*key_columns[i].scalar(),
+ input_batch.length,
ctx_->memory_pool()));
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(key_columns)));
+
+ ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index));
+ util::TempVectorHolder<uint32_t> hash_holder(stack,
util::MiniBatch::kMiniBatchLength);
+ uint32_t* hashes = hash_holder.mutable_data();
+ for (int64_t i = 0; i < key_batch.length; i +=
util::MiniBatch::kMiniBatchLength) {
+ int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
+
static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
+ RETURN_NOT_OK(Hashing32::HashBatch(
+ key_batch, hashes, ctx_->cpu_info()->hardware_flags(), stack, i,
length));
+ RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, length,
hashes));
+ }
+ return Status::OK();
+}
+
+std::pair<HashJoinNode*, std::vector<int>>
BloomFilterPushdownContext::GetPushdownTarget(
+ HashJoinNode* start) {
+#if !ARROW_LITTLE_ENDIAN
+ // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It
probably just
+ // needs a few byte swaps in the proper spots.
+ disable_bloom_filter_ = true;
+ return {nullptr, {}};
+#else
+ if (disable_bloom_filter_) return {nullptr, {}};
+ JoinType join_type = start->join_type_;
+
+ // A build-side Bloom filter tells us if a row is definitely not in the
build side.
+ // This allows us to early-eliminate rows or early-accept rows depending on
the type
+ // of join. Left Outer Join and Full Outer Join output all rows, so a
build-side Bloom
+ // filter would only allow us to early-output. Left Antijoin outputs only if
there is
+ // no match, so again early output. We don't implement early output for now,
so we
+ // must disallow these types of joins.
+ bool bloom_filter_does_not_apply_to_join = join_type == JoinType::LEFT_ANTI
||
+ join_type == JoinType::LEFT_OUTER
||
+ join_type == JoinType::FULL_OUTER;
+ disable_bloom_filter_ = disable_bloom_filter_ ||
bloom_filter_does_not_apply_to_join;
+
+ // Bloom filter currently doesn't support dictionaries.
+ for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
+ SchemaProjectionMap keys_to_input =
start->schema_mgr_->proj_maps[side].map(
+ HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ // Bloom filter currently doesn't support dictionaries.
+ for (int i = 0; i < keys_to_input.num_cols; i++) {
+ int idx = keys_to_input.get(i);
+ bool is_dict =
start->inputs_[side]->output_schema()->field(idx)->type()->id() ==
+ Type::DICTIONARY;
+ if (is_dict) {
+ disable_bloom_filter_ = true;
+ break;
+ }
+ }
+ }
+
+ bool all_comparisons_is = true;
+ for (JoinKeyCmp cmp : start->key_cmp_) all_comparisons_is &= (cmp ==
JoinKeyCmp::IS);
+
+ if ((join_type == JoinType::RIGHT_OUTER || join_type ==
JoinType::FULL_OUTER) &&
+ all_comparisons_is)
+ disable_bloom_filter_ = true;
+
+ if (disable_bloom_filter_) return {nullptr, {}};
+
+ // We currently only push Bloom filters on the probe side, and only if that
input is
+ // also a join.
+ SchemaProjectionMap probe_key_to_input =
start->schema_mgr_->proj_maps[0].map(
+ HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ int num_keys = probe_key_to_input.num_cols;
+
+ // A mapping such that bloom_to_target[i] is the index of key i in the
pushdown
+ // target's input
+ std::vector<int> bloom_to_target(num_keys);
+ HashJoinNode* pushdown_target = start;
+ for (int i = 0; i < num_keys; i++) bloom_to_target[i] =
probe_key_to_input.get(i);
+
+ for (ExecNode* candidate = start->inputs()[0];
+ candidate->kind_name() == start->kind_name(); candidate =
candidate->inputs()[0]) {
+ auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
+ SchemaProjectionMap candidate_output_to_input =
+
candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
+
HashJoinProjection::INPUT);
+
+ // Check if any of the keys are missing, if they are, break
+ bool break_outer = false;
+ for (int i = 0; i < num_keys; i++) {
+ // Since all of the probe side columns are before the build side columns,
+ // if the index of an output is greater than the number of probe-side
input
+ // columns, it must have come from the candidate's build side.
+ if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
+ break_outer = true;
+ break;
+ }
+ int candidate_input_idx =
candidate_output_to_input.get(bloom_to_target[i]);
+ // The output column has to have come from somewhere...
+ ARROW_DCHECK_NE(candidate_input_idx,
start->schema_mgr_->kMissingField());
+ }
+ if (break_outer) break;
+
+ // The Bloom filter will filter out nulls, which may cause a Right/Full
Outer Join
+ // to incorrectly output some rows with nulls padding the probe-side rows.
This may
+ // cause a row with all null keys to be emitted. This is normally not an
issue
+ // with EQ, but if all comparisons are IS (i.e. all-null is accepted),
this could
+ // produce incorrect rows.
+ bool can_produce_build_side_nulls =
+ candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
+ candidate_as_join->join_type_ == JoinType::FULL_OUTER;
+
+ if (all_comparisons_is || can_produce_build_side_nulls) break;
+
+ // All keys are present, we can update the mapping
+ for (int i = 0; i < num_keys; i++) {
+ int candidate_input_idx =
candidate_output_to_input.get(bloom_to_target[i]);
+ bloom_to_target[i] = candidate_input_idx;
+ }
+ pushdown_target = candidate_as_join;
+ }
+ return std::make_pair(pushdown_target, std::move(bloom_to_target));
+#endif // ARROW_LITTLE_ENDIAN
+}
+
namespace internal {
void RegisterHashJoinNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make));