This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ace2cdf06 ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in 
case of zero batches on build side (#13686)
2ace2cdf06 is described below

commit 2ace2cdf06b7a82e1c2024bef89a7d70ec4031ce
Author: michalursa <[email protected]>
AuthorDate: Wed Jul 27 09:14:21 2022 -0700

    ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in case of zero 
batches on build side (#13686)
    
    Hash join implementation using HashJoinBasicImpl class was missing 
initialization in case of no batches one the build side.
    Initialization of a few data structures, mainly two RowEncoder instances 
for holding key and payload columns for rows on build side, was missing inside 
BuildHashTable_exec_task, the method responsible for transforming accumulated 
batches on build side of the hash join into a hash table.
    
    The initialization of RowEncoder inserts a single special row containing 
null values for all columns. This special row is accessed when outputting probe 
side rows with no matches in case of left outer and full outer join (these 
joins are supposed in that case to output nulls in place of all fields that 
would come from build side).
    
    Interestingly, the initialization was present in a similar case when 
batches were present on build side but all of them included zero rows. I 
modified the code to use the same code path for both these logically equivalent 
cases: a) zero build side batches and b) non-zero batches but with zero rows 
each.
    
    Authored-by: michalursa <[email protected]>
    Signed-off-by: Krisztián Szűcs <[email protected]>
---
 cpp/src/arrow/compute/exec/hash_join.cc           | 57 +++++++++++------------
 cpp/src/arrow/compute/exec/hash_join_node_test.cc |  9 ++++
 2 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/hash_join.cc 
b/cpp/src/arrow/compute/exec/hash_join.cc
index 07a3083fb9..5cf66b3d09 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -561,38 +561,35 @@ class HashJoinBasicImpl : public HashJoinImpl {
 
   Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
     AccumulationQueue batches = std::move(build_batches_);
-    if (batches.empty()) {
-      hash_table_empty_ = true;
-    } else {
-      dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
-      bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) > 
0);
-      if (has_payload) {
-        InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+    dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
+    bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) > 0);
+    if (has_payload) {
+      InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+    }
+    hash_table_empty_ = true;
+
+    for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
+      if (cancelled_) {
+        return Status::Cancelled("Hash join cancelled");
       }
-      hash_table_empty_ = true;
-      for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
-        if (cancelled_) {
-          return Status::Cancelled("Hash join cancelled");
-        }
-        const ExecBatch& batch = batches[ibatch];
-        if (batch.length == 0) {
-          continue;
-        } else if (hash_table_empty_) {
-          hash_table_empty_ = false;
+      const ExecBatch& batch = batches[ibatch];
+      if (batch.length == 0) {
+        continue;
+      } else if (hash_table_empty_) {
+        hash_table_empty_ = false;
 
-          RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
-        }
-        int32_t num_rows_before = hash_table_keys_.num_rows();
-        RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
-                                              &hash_table_keys_, ctx_));
-        if (has_payload) {
-          RETURN_NOT_OK(
-              EncodeBatch(1, HashJoinProjection::PAYLOAD, 
&hash_table_payloads_, batch));
-        }
-        int32_t num_rows_after = hash_table_keys_.num_rows();
-        for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
-          
hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), irow));
-        }
+        RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
+      }
+      int32_t num_rows_before = hash_table_keys_.num_rows();
+      RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
+                                            &hash_table_keys_, ctx_));
+      if (has_payload) {
+        RETURN_NOT_OK(
+            EncodeBatch(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_, 
batch));
+      }
+      int32_t num_rows_after = hash_table_keys_.num_rows();
+      for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
+        hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), 
irow));
       }
     }
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc 
b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index b4fd7ee643..d06b76159d 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1298,6 +1298,15 @@ void TestHashJoinDictionaryHelper(
     }
   }
 
+  // Instead of sending 2 batches of size 0 we should not send any batches
+  // at all to more accurately simulate real world use cases
+  if (l_length == 0) {
+    l_batches.batches.resize(0);
+  }
+  if (r_length == 0) {
+    r_batches.batches.resize(0);
+  }
+
   auto exec_ctx = arrow::internal::make_unique<ExecContext>(
       default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : 
nullptr);
   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));

Reply via email to