This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0aa622c52f GH-45196: [C++][Acero] Small refinement to hash join 
(#45197)
0aa622c52f is described below

commit 0aa622c52f97f76ffaccbd677b5ff7c36ff2f1a1
Author: Rossi Sun <[email protected]>
AuthorDate: Wed Jan 8 12:47:52 2025 +0800

    GH-45196: [C++][Acero] Small refinement to hash join (#45197)
    
    
    
    ### Rationale for this change
    
    See #45196
    
    ### What changes are included in this PR?
    
    Refine/simplify the code mentioned in the issue.
    
    ### Are these changes tested?
    
    Existing tests suffice.
    
    ### Are there any user-facing changes?
    
    None.
    
    * GitHub Issue: #45196
    
    Authored-by: Rossi Sun <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 cpp/src/arrow/acero/hash_join.cc      | 16 ++++------------
 cpp/src/arrow/acero/hash_join_node.cc |  3 +++
 cpp/src/arrow/acero/swiss_join.cc     | 10 ++--------
 3 files changed, 9 insertions(+), 20 deletions(-)

diff --git a/cpp/src/arrow/acero/hash_join.cc b/cpp/src/arrow/acero/hash_join.cc
index ddcd2a0995..1637f3bd35 100644
--- a/cpp/src/arrow/acero/hash_join.cc
+++ b/cpp/src/arrow/acero/hash_join.cc
@@ -568,20 +568,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
     if (has_payload) {
       InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
     }
-    hash_table_empty_ = true;
+    hash_table_empty_ = batches.empty();
+    RETURN_NOT_OK(dict_build_.Init(*schema_[1], hash_table_empty_ ? nullptr : 
&batches[0],
+                                   ctx_->exec_context()));
 
     for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
       if (cancelled_) {
         return Status::Cancelled("Hash join cancelled");
       }
       const ExecBatch& batch = batches[ibatch];
-      if (batch.length == 0) {
-        continue;
-      } else if (hash_table_empty_) {
-        hash_table_empty_ = false;
-
-        RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, 
ctx_->exec_context()));
-      }
+      DCHECK_GT(batch.length, 0);
       int32_t num_rows_before = hash_table_keys_.num_rows();
       RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
                                             &hash_table_keys_, 
ctx_->exec_context()));
@@ -595,10 +591,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
       }
     }
 
-    if (hash_table_empty_) {
-      RETURN_NOT_OK(dict_build_.Init(*schema_[1], nullptr, 
ctx_->exec_context()));
-    }
-
     return Status::OK();
   }
 
diff --git a/cpp/src/arrow/acero/hash_join_node.cc 
b/cpp/src/arrow/acero/hash_join_node.cc
index 80dd163ced..f6959818c5 100644
--- a/cpp/src/arrow/acero/hash_join_node.cc
+++ b/cpp/src/arrow/acero/hash_join_node.cc
@@ -772,6 +772,9 @@ class HashJoinNode : public ExecNode, public TracedNode {
   const char* kind_name() const override { return "HashJoinNode"; }
 
   Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+    if (batch.length == 0) {
+      return Status::OK();
+    }
     std::lock_guard<std::mutex> guard(build_side_mutex_);
     build_accumulator_.InsertBatch(std::move(batch));
     return Status::OK();
diff --git a/cpp/src/arrow/acero/swiss_join.cc 
b/cpp/src/arrow/acero/swiss_join.cc
index c88279fd54..c068eeb50f 100644
--- a/cpp/src/arrow/acero/swiss_join.cc
+++ b/cpp/src/arrow/acero/swiss_join.cc
@@ -2598,6 +2598,8 @@ class SwissJoin : public HashJoinImpl {
       return Status::OK();
     }
 
+    DCHECK_GT(build_side_batches_[batch_id].length, 0);
+
     const HashJoinProjectionMaps* schema = schema_[1];
     bool no_payload = hash_table_build_.no_payload();
 
@@ -2605,10 +2607,6 @@ class SwissJoin : public HashJoinImpl {
     ARROW_ASSIGN_OR_RAISE(
         input_batch, KeyPayloadFromInput(/*side=*/1, 
&build_side_batches_[batch_id]));
 
-    if (input_batch.length == 0) {
-      return Status::OK();
-    }
-
     // Split batch into key batch and optional payload batch
     //
     // Input batch is key-payload batch (key columns followed by payload
@@ -2637,10 +2635,6 @@ class SwissJoin : public HashJoinImpl {
         static_cast<int64_t>(thread_id), key_batch, no_payload ? nullptr : 
&payload_batch,
         temp_stack)));
 
-    // Release input batch
-    //
-    input_batch.values.clear();
-
     return Status::OK();
   }
 

Reply via email to