westonpace commented on code in PR #13332:
URL: https://github.com/apache/arrow/pull/13332#discussion_r892487699
##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -789,159 +603,30 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
- Status BuildBloomFilter_on_finished(size_t thread_index) {
- if (cancelled_) return Status::Cancelled("Hash join cancelled");
- ARROW_DCHECK(pushdown_target_);
- RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
- thread_index, std::move(bloom_filter_), std::move(column_map_)));
- return BuildHashTable(thread_index);
- }
-
Status BuildHashTable_on_finished(size_t thread_index) {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
-
- right_batches_.clear();
-
- bool proceed;
- {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- std::lock_guard<std::mutex> lock_finish(finished_mutex_);
- left_queue_bloom_finished_ =
- left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
- proceed = !has_hash_table_ && left_queue_bloom_finished_;
- has_hash_table_ = true;
- }
- if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
-
- return Status::OK();
- }
-
- void RegisterBuildBloomFilter() {
- task_group_bloom_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return BuildBloomFilter_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return BuildBloomFilter_on_finished(thread_index);
- });
- }
-
- void RegisterBuildHashTable() {
- task_group_build_ = scheduler_->RegisterTaskGroup(
- [this](size_t thread_index, int64_t task_id) -> Status {
- return BuildHashTable_exec_task(thread_index, task_id);
- },
- [this](size_t thread_index) -> Status {
- return BuildHashTable_on_finished(thread_index);
- });
+ build_batches_.Clear();
Review Comment:
This seems redundant given that you moved out of here above. Maybe just a
`DCHECK_EQ(0, build_batches_.size())`?
##########
cpp/src/arrow/compute/exec/accumulation_queue.h:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <cstdint>
+#include <vector>
+#include "arrow/compute/exec.h"
Review Comment:
```suggestion
#pragma once
#include <cstdint>
#include <vector>
#include "arrow/compute/exec.h"
```
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK(eval_.received_filters_.size() ==
eval_.received_maps_.size());
+ ARROW_DCHECK(eval_.received_filters_.size() <=
eval_.num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ return push_.all_received_callback_();
+ }
+ return Status::OK();
+ }
+
+ Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+ FilterFinishedCallback on_finished) {
+ eval_.batches_ = std::move(batches);
+ eval_.on_finished_ = std::move(on_finished);
+
+ if (eval_.num_expected_bloom_filters_ == 0)
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+
+ return scheduler_->StartTaskGroup(thread_index, eval_.task_id_,
+
/*num_tasks=*/eval_.batches_.batch_count());
+ }
+
+ Status FilterSingleBatch(size_t thread_index, ExecBatch& batch) {
+ if (disable_bloom_filter_ || batch.length == 0) return Status::OK();
+ int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+ std::vector<uint8_t> selected(bit_vector_bytes);
+ std::vector<uint32_t> hashes(batch.length);
+ std::vector<uint8_t> bv(bit_vector_bytes);
+
+ ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack,
GetStack(thread_index));
+
+ // Start with full selection for the current batch
+ memset(selected.data(), 0xff, bit_vector_bytes);
+ for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_;
ifilter++) {
+ std::vector<Datum> keys(eval_.received_maps_[ifilter].size());
+ for (size_t i = 0; i < keys.size(); i++) {
+ int input_idx = eval_.received_maps_[ifilter][i];
+ keys[i] = batch[input_idx];
+ if (keys[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ keys[i],
+ MakeArrayFromScalar(*keys[i].scalar(), batch.length,
ctx_->memory_pool()));
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(keys)));
+ RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(),
+ ctx_->cpu_info()->hardware_flags(),
stack, 0,
+ key_batch.length));
+
+
eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+ key_batch.length, hashes.data(),
bv.data());
+ arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0,
key_batch.length, 0,
+ selected.data());
+ }
+ auto selected_buffer =
+ arrow::internal::make_unique<Buffer>(selected.data(),
bit_vector_bytes);
+ ArrayData selected_arraydata(boolean(), batch.length,
+ {nullptr, std::move(selected_buffer)});
+ Datum selected_datum(selected_arraydata);
+ FilterOptions options;
+ size_t first_nonscalar = batch.values.size();
+ for (size_t i = 0; i < batch.values.size(); i++) {
+ if (!batch.values[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(batch.values[i],
+ Filter(batch.values[i], selected_datum, options,
ctx_));
+ first_nonscalar = std::min(first_nonscalar, i);
+ ARROW_DCHECK_EQ(batch.values[i].length(),
batch.values[first_nonscalar].length());
+ }
+ }
+ // If they're all Scalar, then the length of the batch is the number of
set bits
+ if (first_nonscalar == batch.values.size())
+ batch.length = arrow::internal::CountSetBits(selected.data(), 0,
batch.length);
+ else
+ batch.length = batch.values[first_nonscalar].length();
+ return Status::OK();
+ }
+
+ Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id);
+
+ Status BuildBloomFilter_on_finished(size_t thread_index) {
+ return build_.on_finished_(thread_index, std::move(build_.batches_));
+ }
+
+ // The Bloom filter is built on the build side of some upstream join. For a
join to
+ // evaluate the Bloom filter on its input columns, it has to rearrange its
input columns
+ // to match the column order of the Bloom filter.
+ //
+ // The first part of the pair is the HashJoin to actually perform the
pushdown into.
+ // The second part is a mapping such that column_map[i] is the index of key
i in
+ // the first part's input.
+ // If we should disable Bloom filter, returns nullptr and an empty vector,
and sets
+ // the disable_bloom_filter_ flag.
+ std::pair<HashJoinNode*, std::vector<int>> GetPushdownTarget(HashJoinNode*
start);
+
+ Result<util::TempVectorStack*> GetStack(size_t thread_index) {
+ if (!tld_[thread_index].is_init) {
+ RETURN_NOT_OK(tld_[thread_index].stack.Init(
+ ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength *
sizeof(uint32_t)));
+ tld_[thread_index].is_init = true;
+ }
+ return &tld_[thread_index].stack;
+ }
+
+ bool disable_bloom_filter_;
+ HashJoinNode* owner_;
+ ExecContext* ctx_;
+ TaskScheduler* scheduler_;
+
+ struct ThreadLocalData {
+ bool is_init = false;
+ util::TempVectorStack stack;
+ };
+ std::vector<ThreadLocalData> tld_;
+
+ struct {
+ int task_id_;
+ std::unique_ptr<BloomFilterBuilder> builder_;
+ AccumulationQueue batches_;
+ BuildFinishedCallback on_finished_;
+ } build_;
+
+ struct {
+ std::unique_ptr<BlockedBloomFilter> bloom_filter_;
+ HashJoinNode* pushdown_target_;
+ std::vector<int> column_map_;
+ FiltersReceivedCallback all_received_callback_;
Review Comment:
This feels more like an `eval_` thing?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK(eval_.received_filters_.size() ==
eval_.received_maps_.size());
+ ARROW_DCHECK(eval_.received_filters_.size() <=
eval_.num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ return push_.all_received_callback_();
+ }
+ return Status::OK();
+ }
+
+ Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+ FilterFinishedCallback on_finished) {
+ eval_.batches_ = std::move(batches);
+ eval_.on_finished_ = std::move(on_finished);
+
+ if (eval_.num_expected_bloom_filters_ == 0)
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+
+ return scheduler_->StartTaskGroup(thread_index, eval_.task_id_,
+
/*num_tasks=*/eval_.batches_.batch_count());
+ }
+
+ Status FilterSingleBatch(size_t thread_index, ExecBatch& batch) {
+ if (disable_bloom_filter_ || batch.length == 0) return Status::OK();
+ int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+ std::vector<uint8_t> selected(bit_vector_bytes);
+ std::vector<uint32_t> hashes(batch.length);
+ std::vector<uint8_t> bv(bit_vector_bytes);
+
+ ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack,
GetStack(thread_index));
+
+ // Start with full selection for the current batch
+ memset(selected.data(), 0xff, bit_vector_bytes);
+ for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_;
ifilter++) {
+ std::vector<Datum> keys(eval_.received_maps_[ifilter].size());
+ for (size_t i = 0; i < keys.size(); i++) {
+ int input_idx = eval_.received_maps_[ifilter][i];
+ keys[i] = batch[input_idx];
+ if (keys[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ keys[i],
+ MakeArrayFromScalar(*keys[i].scalar(), batch.length,
ctx_->memory_pool()));
+ }
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch,
ExecBatch::Make(std::move(keys)));
+ RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(),
+ ctx_->cpu_info()->hardware_flags(),
stack, 0,
+ key_batch.length));
+
+
eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+ key_batch.length, hashes.data(),
bv.data());
+ arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0,
key_batch.length, 0,
+ selected.data());
+ }
+ auto selected_buffer =
+ arrow::internal::make_unique<Buffer>(selected.data(),
bit_vector_bytes);
+ ArrayData selected_arraydata(boolean(), batch.length,
+ {nullptr, std::move(selected_buffer)});
+ Datum selected_datum(selected_arraydata);
+ FilterOptions options;
+ size_t first_nonscalar = batch.values.size();
+ for (size_t i = 0; i < batch.values.size(); i++) {
+ if (!batch.values[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(batch.values[i],
+ Filter(batch.values[i], selected_datum, options,
ctx_));
+ first_nonscalar = std::min(first_nonscalar, i);
+ ARROW_DCHECK_EQ(batch.values[i].length(),
batch.values[first_nonscalar].length());
+ }
+ }
+ // If they're all Scalar, then the length of the batch is the number of
set bits
+ if (first_nonscalar == batch.values.size())
+ batch.length = arrow::internal::CountSetBits(selected.data(), 0,
batch.length);
+ else
+ batch.length = batch.values[first_nonscalar].length();
+ return Status::OK();
+ }
+
+ Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id);
+
+ Status BuildBloomFilter_on_finished(size_t thread_index) {
+ return build_.on_finished_(thread_index, std::move(build_.batches_));
+ }
+
+ // The Bloom filter is built on the build side of some upstream join. For a
join to
+ // evaluate the Bloom filter on its input columns, it has to rearrange its
input columns
+ // to match the column order of the Bloom filter.
+ //
+ // The first part of the pair is the HashJoin to actually perform the
pushdown into.
+ // The second part is a mapping such that column_map[i] is the index of key
i in
+ // the first part's input.
+ // If we should disable Bloom filter, returns nullptr and an empty vector,
and sets
+ // the disable_bloom_filter_ flag.
+ std::pair<HashJoinNode*, std::vector<int>> GetPushdownTarget(HashJoinNode*
start);
+
+ Result<util::TempVectorStack*> GetStack(size_t thread_index) {
+ if (!tld_[thread_index].is_init) {
+ RETURN_NOT_OK(tld_[thread_index].stack.Init(
+ ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength *
sizeof(uint32_t)));
+ tld_[thread_index].is_init = true;
+ }
+ return &tld_[thread_index].stack;
+ }
+
+ bool disable_bloom_filter_;
+ HashJoinNode* owner_;
Review Comment:
It seems this is only used to access the schema projection maps. Maybe just
take a pointer to that instead? Then it would be clearer why this is needed.
##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -1105,16 +745,15 @@ class HashJoinBasicImpl : public HashJoinImpl {
HashJoinSchema* schema_mgr_;
std::vector<JoinKeyCmp> key_cmp_;
Expression filter_;
- std::unique_ptr<TaskScheduler> scheduler_;
- int task_group_bloom_;
+ TaskScheduler* scheduler_;
int task_group_build_;
- int task_group_bloom_filter_queued_;
- int task_group_queued_;
int task_group_scan_;
// Callbacks
//
OutputBatchCallback output_batch_callback_;
+ BuildFinishedCallback build_finished_callback_;
+ ProbeFinishedCallback probe_finished_callback_;
Review Comment:
Is this used?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
Review Comment:
I think there would be some value here in comments explaining the general
strategy of what we are trying to do (or maybe, as has also been discussed, we
just need more general design documents). Maybe just some short comments on
each of these methods and a paragraph on the struct itself.
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK(eval_.received_filters_.size() ==
eval_.received_maps_.size());
+ ARROW_DCHECK(eval_.received_filters_.size() <=
eval_.num_expected_bloom_filters_);
Review Comment:
`ARROW_DCHECK_EQ` & `ARROW_DCHECK_LE`?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -516,6 +684,125 @@ class HashJoinNode : public ExecNode {
const char* kind_name() const override { return "HashJoinNode"; }
+ Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ std::lock_guard<std::mutex> guard(build_side_mutex_);
+ accumulators_[1].InsertBatch(std::move(batch));
Review Comment:
Can we use constants instead of `1`? Something like
`accumulators_[kBuildSide]`. Or, if there is only ever going to be two, can we
just have `build_accumulator` and `probe_accumulator`?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK(eval_.received_filters_.size() ==
eval_.received_maps_.size());
+ ARROW_DCHECK(eval_.received_filters_.size() <=
eval_.num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ return push_.all_received_callback_();
+ }
+ return Status::OK();
+ }
+
+ Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+ FilterFinishedCallback on_finished) {
+ eval_.batches_ = std::move(batches);
+ eval_.on_finished_ = std::move(on_finished);
+
+ if (eval_.num_expected_bloom_filters_ == 0)
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+
+ return scheduler_->StartTaskGroup(thread_index, eval_.task_id_,
+
/*num_tasks=*/eval_.batches_.batch_count());
+ }
+
+ Status FilterSingleBatch(size_t thread_index, ExecBatch& batch) {
Review Comment:
Arrow's style suggests we take in `ExecBatch*` for a mutable argument.
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -516,6 +684,125 @@ class HashJoinNode : public ExecNode {
const char* kind_name() const override { return "HashJoinNode"; }
+ Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ std::lock_guard<std::mutex> guard(build_side_mutex_);
+ accumulators_[1].InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+
+ Status OnBuildSideFinished(size_t thread_index) {
+ return pushdown_context_.BuildBloomFilter(
+ thread_index, std::move(accumulators_[1]),
+ [this](size_t thread_index, AccumulationQueue batches) {
+ return OnBloomFilterFinished(thread_index, std::move(batches));
+ });
+ }
+
+ Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches)
{
+ RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+ return impl_->BuildHashTable(
+ thread_index, std::move(batches),
+ [this](size_t thread_index) { return
OnHashTableFinished(thread_index); });
+ }
+
+ Status OnHashTableFinished(size_t thread_index) {
+ bool should_probe;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_probe = queued_batches_filtered_ && !hash_table_ready_;
+ hash_table_ready_ = true;
+ }
+ if (should_probe) {
+ return ProbeQueuedBatches(thread_index);
+ }
+ return Status::OK();
+ }
+
+ Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+ bool should_filter;
+ bool should_accum;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_filter = bloom_filters_ready_;
+ should_accum = !should_filter || !hash_table_ready_;
+ if (should_accum) {
+ accumulators_[0].InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+ }
+ if (should_filter)
Review Comment:
Is this check needed? How can we get here if `should_filter` is false?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -516,6 +684,125 @@ class HashJoinNode : public ExecNode {
const char* kind_name() const override { return "HashJoinNode"; }
+ Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ std::lock_guard<std::mutex> guard(build_side_mutex_);
+ accumulators_[1].InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+
+ Status OnBuildSideFinished(size_t thread_index) {
+ return pushdown_context_.BuildBloomFilter(
+ thread_index, std::move(accumulators_[1]),
+ [this](size_t thread_index, AccumulationQueue batches) {
+ return OnBloomFilterFinished(thread_index, std::move(batches));
+ });
+ }
+
+ Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches)
{
+ RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+ return impl_->BuildHashTable(
+ thread_index, std::move(batches),
+ [this](size_t thread_index) { return
OnHashTableFinished(thread_index); });
+ }
+
+ Status OnHashTableFinished(size_t thread_index) {
+ bool should_probe;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_probe = queued_batches_filtered_ && !hash_table_ready_;
+ hash_table_ready_ = true;
+ }
+ if (should_probe) {
+ return ProbeQueuedBatches(thread_index);
+ }
+ return Status::OK();
+ }
+
+ Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+ bool should_filter;
+ bool should_accum;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_filter = bloom_filters_ready_;
+ should_accum = !should_filter || !hash_table_ready_;
+ if (should_accum) {
Review Comment:
I'm not 100% sure I'm following this but it seems like `should_accum` will
be true if the bloom filters are ready but the hash table is not ready.
Couldn't we run filter single batch and then accumulate in that case?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
Review Comment:
Given that this is a .cc file, why are some of these defined inline and some
are only declared here?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -516,6 +684,125 @@ class HashJoinNode : public ExecNode {
const char* kind_name() const override { return "HashJoinNode"; }
+ Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ std::lock_guard<std::mutex> guard(build_side_mutex_);
+ accumulators_[1].InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+
+ Status OnBuildSideFinished(size_t thread_index) {
+ return pushdown_context_.BuildBloomFilter(
+ thread_index, std::move(accumulators_[1]),
+ [this](size_t thread_index, AccumulationQueue batches) {
+ return OnBloomFilterFinished(thread_index, std::move(batches));
+ });
+ }
+
+ Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches)
{
+ RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+ return impl_->BuildHashTable(
+ thread_index, std::move(batches),
+ [this](size_t thread_index) { return
OnHashTableFinished(thread_index); });
+ }
+
+ Status OnHashTableFinished(size_t thread_index) {
+ bool should_probe;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_probe = queued_batches_filtered_ && !hash_table_ready_;
+ hash_table_ready_ = true;
+ }
+ if (should_probe) {
+ return ProbeQueuedBatches(thread_index);
+ }
+ return Status::OK();
+ }
+
+ Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+ bool should_filter;
+ bool should_accum;
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_filter = bloom_filters_ready_;
+ should_accum = !should_filter || !hash_table_ready_;
+ if (should_accum) {
+ accumulators_[0].InsertBatch(std::move(batch));
+ return Status::OK();
+ }
+ }
+ if (should_filter)
+ RETURN_NOT_OK(pushdown_context_.FilterSingleBatch(thread_index, batch));
+
+ {
+ std::lock_guard<std::mutex> guard(probe_side_mutex_);
+ should_accum = !should_filter || !hash_table_ready_;
+ if (should_accum) accumulators_[0].InsertBatch(std::move(batch));
Review Comment:
Couldn't this just be `if (!hash_table_ready_)`?
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -454,6 +456,172 @@ Status
HashJoinSchema::CollectFilterColumns(std::vector<FieldRef>& left_filter,
return Status::OK();
}
+class HashJoinNode;
+
+struct BloomFilterPushdownContext {
+ using BuildFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ using FiltersReceivedCallback = std::function<Status()>;
+ using FilterFinishedCallback = std::function<Status(size_t,
AccumulationQueue)>;
+ void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler,
+ FiltersReceivedCallback on_bloom_filters_received, bool
disable_bloom_filter,
+ bool use_sync_execution);
+
+ Status StartProducing();
+
+ void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; }
+
+ Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches,
+ BuildFinishedCallback on_finished);
+
+ Status PushBloomFilter();
+
+ Status ReceiveBloomFilter(std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> guard(eval_.receive_mutex_);
+ eval_.received_filters_.emplace_back(std::move(filter));
+ eval_.received_maps_.emplace_back(std::move(column_map));
+ proceed = eval_.num_expected_bloom_filters_ ==
eval_.received_filters_.size();
+
+ ARROW_DCHECK(eval_.received_filters_.size() ==
eval_.received_maps_.size());
+ ARROW_DCHECK(eval_.received_filters_.size() <=
eval_.num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ return push_.all_received_callback_();
+ }
+ return Status::OK();
+ }
+
+ Status FilterBatches(size_t thread_index, AccumulationQueue batches,
+ FilterFinishedCallback on_finished) {
+ eval_.batches_ = std::move(batches);
+ eval_.on_finished_ = std::move(on_finished);
+
+ if (eval_.num_expected_bloom_filters_ == 0)
Review Comment:
Why not check for `disable_bloom_filter_` here too? Then it seems like you
wouldn't have to check in `FilterSingleBatch`
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -780,9 +983,228 @@ class HashJoinNode : public ExecNode {
std::unique_ptr<HashJoinSchema> schema_mgr_;
std::unique_ptr<HashJoinImpl> impl_;
util::AsyncTaskGroup task_group_;
+ std::unique_ptr<TaskScheduler> scheduler_;
+ util::AccumulationQueue accumulators_[2];
+ util::AccumulationQueue queued_batches_to_probe_;
+
+ std::mutex build_side_mutex_;
+ std::mutex probe_side_mutex_;
+
+ int task_group_probe_;
+ bool bloom_filters_ready_ = false;
+ bool hash_table_ready_ = false;
+ bool queued_batches_filtered_ = false;
+ bool queued_batches_probed_ = false;
+ bool probe_side_finished_ = false;
+
+ friend struct BloomFilterPushdownContext;
bool disable_bloom_filter_;
+ BloomFilterPushdownContext pushdown_context_;
};
+void BloomFilterPushdownContext::Init(HashJoinNode* owner, size_t num_threads,
+ TaskScheduler* scheduler,
+ FiltersReceivedCallback
on_bloom_filters_received,
+ bool disable_bloom_filter,
+ bool use_sync_execution) {
+ owner_ = owner;
+ ctx_ = owner->plan_->exec_context();
+ scheduler_ = scheduler;
+ tld_.resize(num_threads);
+ disable_bloom_filter_ = disable_bloom_filter;
+ std::tie(push_.pushdown_target_, push_.column_map_) =
GetPushdownTarget(owner);
+ push_.all_received_callback_ = std::move(on_bloom_filters_received);
+ if (!disable_bloom_filter_) {
+ ARROW_CHECK(push_.pushdown_target_);
+ push_.bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+ push_.pushdown_target_->pushdown_context_.ExpectBloomFilter();
+
+ build_.builder_ = BloomFilterBuilder::Make(
+ use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+ : BloomFilterBuildStrategy::PARALLEL);
+
+ build_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return BuildBloomFilter_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) {
+ return BuildBloomFilter_on_finished(thread_index);
+ });
+ }
+
+ eval_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return FilterSingleBatch(thread_index, eval_.batches_[task_id]);
+ },
+ [this](size_t thread_index) {
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+ });
+}
+
+Status BloomFilterPushdownContext::StartProducing() {
+ if (eval_.num_expected_bloom_filters_ == 0) return
push_.all_received_callback_();
Review Comment:
This is fine but is there any reason we can't just call this from `Init`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]