save-buffer commented on code in PR #13332:
URL: https://github.com/apache/arrow/pull/13332#discussion_r892750760
##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -780,9 +983,228 @@ class HashJoinNode : public ExecNode {
std::unique_ptr<HashJoinSchema> schema_mgr_;
std::unique_ptr<HashJoinImpl> impl_;
util::AsyncTaskGroup task_group_;
+ std::unique_ptr<TaskScheduler> scheduler_;
+ util::AccumulationQueue accumulators_[2];
+ util::AccumulationQueue queued_batches_to_probe_;
+
+ std::mutex build_side_mutex_;
+ std::mutex probe_side_mutex_;
+
+ int task_group_probe_;
+ bool bloom_filters_ready_ = false;
+ bool hash_table_ready_ = false;
+ bool queued_batches_filtered_ = false;
+ bool queued_batches_probed_ = false;
+ bool probe_side_finished_ = false;
+
+ friend struct BloomFilterPushdownContext;
bool disable_bloom_filter_;
+ BloomFilterPushdownContext pushdown_context_;
};
+void BloomFilterPushdownContext::Init(HashJoinNode* owner, size_t num_threads,
+ TaskScheduler* scheduler,
+ FiltersReceivedCallback
on_bloom_filters_received,
+ bool disable_bloom_filter,
+ bool use_sync_execution) {
+ owner_ = owner;
+ ctx_ = owner->plan_->exec_context();
+ scheduler_ = scheduler;
+ tld_.resize(num_threads);
+ disable_bloom_filter_ = disable_bloom_filter;
+ std::tie(push_.pushdown_target_, push_.column_map_) =
GetPushdownTarget(owner);
+ push_.all_received_callback_ = std::move(on_bloom_filters_received);
+ if (!disable_bloom_filter_) {
+ ARROW_CHECK(push_.pushdown_target_);
+ push_.bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+ push_.pushdown_target_->pushdown_context_.ExpectBloomFilter();
+
+ build_.builder_ = BloomFilterBuilder::Make(
+ use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+ : BloomFilterBuildStrategy::PARALLEL);
+
+ build_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return BuildBloomFilter_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) {
+ return BuildBloomFilter_on_finished(thread_index);
+ });
+ }
+
+ eval_.task_id_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) {
+ return FilterSingleBatch(thread_index, eval_.batches_[task_id]);
+ },
+ [this](size_t thread_index) {
+ return eval_.on_finished_(thread_index, std::move(eval_.batches_));
+ });
+}
+
+Status BloomFilterPushdownContext::StartProducing() {
+ if (eval_.num_expected_bloom_filters_ == 0) return
push_.all_received_callback_();
Review Comment:
We may not have received all calls to `ExpectBloomFilter` inside of
`PrepareToProduce`, we need `StartProducing` because that's the earliest we're
guaranteed that the Bloom filters have sorted themselves out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]