save-buffer commented on code in PR #13332:
URL: https://github.com/apache/arrow/pull/13332#discussion_r892758327


##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -516,6 +684,125 @@ class HashJoinNode : public ExecNode {
 
   const char* kind_name() const override { return "HashJoinNode"; }
 
+  Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+    std::lock_guard<std::mutex> guard(build_side_mutex_);
+    accumulators_[1].InsertBatch(std::move(batch));
+    return Status::OK();
+  }
+
+  Status OnBuildSideFinished(size_t thread_index) {
+    return pushdown_context_.BuildBloomFilter(
+        thread_index, std::move(accumulators_[1]),
+        [this](size_t thread_index, AccumulationQueue batches) {
+          return OnBloomFilterFinished(thread_index, std::move(batches));
+        });
+  }
+
+  Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) 
{
+    RETURN_NOT_OK(pushdown_context_.PushBloomFilter());
+    return impl_->BuildHashTable(
+        thread_index, std::move(batches),
+        [this](size_t thread_index) { return 
OnHashTableFinished(thread_index); });
+  }
+
+  Status OnHashTableFinished(size_t thread_index) {
+    bool should_probe;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      should_probe = queued_batches_filtered_ && !hash_table_ready_;
+      hash_table_ready_ = true;
+    }
+    if (should_probe) {
+      return ProbeQueuedBatches(thread_index);
+    }
+    return Status::OK();
+  }
+
+  Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) {
+    bool should_filter;
+    bool should_accum;
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      should_filter = bloom_filters_ready_;
+      should_accum = !should_filter || !hash_table_ready_;
+      if (should_accum) {
+        accumulators_[0].InsertBatch(std::move(batch));
+        return Status::OK();
+      }
+    }
+    if (should_filter)
+      RETURN_NOT_OK(pushdown_context_.FilterSingleBatch(thread_index, batch));
+
+    {
+      std::lock_guard<std::mutex> guard(probe_side_mutex_);
+      should_accum = !should_filter || !hash_table_ready_;
+      if (should_accum) accumulators_[0].InsertBatch(std::move(batch));

Review Comment:
   Yep, good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to