This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2ace2cdf06 ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in
case of zero batches on build side (#13686)
2ace2cdf06 is described below
commit 2ace2cdf06b7a82e1c2024bef89a7d70ec4031ce
Author: michalursa <[email protected]>
AuthorDate: Wed Jul 27 09:14:21 2022 -0700
ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in case of zero
batches on build side (#13686)
Hash join implementation using HashJoinBasicImpl class was missing
initialization in case of no batches one the build side.
Initialization of a few data structures, mainly two RowEncoder instances
for holding key and payload columns for rows on build side, was missing inside
BuildHashTable_exec_task, the method responsible for transforming accumulated
batches on build side of the hash join into a hash table.
The initialization of RowEncoder inserts a single special row containing
null values for all columns. This special row is accessed when outputting probe
side rows with no matches in case of left outer and full outer join (these
joins are supposed in that case to output nulls in place of all fields that
would come from build side).
Interestingly, the initialization was present in a similar case when
batches were present on build side but all of them included zero rows. I
modified the code to use the same code path for both these logically equivalent
cases: a) zero build side batches and b) non-zero batches but with zero rows
each.
Authored-by: michalursa <[email protected]>
Signed-off-by: Krisztián Szűcs <[email protected]>
---
cpp/src/arrow/compute/exec/hash_join.cc | 57 +++++++++++------------
cpp/src/arrow/compute/exec/hash_join_node_test.cc | 9 ++++
2 files changed, 36 insertions(+), 30 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc
b/cpp/src/arrow/compute/exec/hash_join.cc
index 07a3083fb9..5cf66b3d09 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -561,38 +561,35 @@ class HashJoinBasicImpl : public HashJoinImpl {
Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
AccumulationQueue batches = std::move(build_batches_);
- if (batches.empty()) {
- hash_table_empty_ = true;
- } else {
- dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
- bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) >
0);
- if (has_payload) {
- InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+ dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
+ bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) > 0);
+ if (has_payload) {
+ InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+ }
+ hash_table_empty_ = true;
+
+ for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
+ if (cancelled_) {
+ return Status::Cancelled("Hash join cancelled");
}
- hash_table_empty_ = true;
- for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
- if (cancelled_) {
- return Status::Cancelled("Hash join cancelled");
- }
- const ExecBatch& batch = batches[ibatch];
- if (batch.length == 0) {
- continue;
- } else if (hash_table_empty_) {
- hash_table_empty_ = false;
+ const ExecBatch& batch = batches[ibatch];
+ if (batch.length == 0) {
+ continue;
+ } else if (hash_table_empty_) {
+ hash_table_empty_ = false;
- RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
- }
- int32_t num_rows_before = hash_table_keys_.num_rows();
- RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
- &hash_table_keys_, ctx_));
- if (has_payload) {
- RETURN_NOT_OK(
- EncodeBatch(1, HashJoinProjection::PAYLOAD,
&hash_table_payloads_, batch));
- }
- int32_t num_rows_after = hash_table_keys_.num_rows();
- for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
-
hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), irow));
- }
+ RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
+ }
+ int32_t num_rows_before = hash_table_keys_.num_rows();
+ RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
+ &hash_table_keys_, ctx_));
+ if (has_payload) {
+ RETURN_NOT_OK(
+ EncodeBatch(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_,
batch));
+ }
+ int32_t num_rows_after = hash_table_keys_.num_rows();
+ for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
+ hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow),
irow));
}
}
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index b4fd7ee643..d06b76159d 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1298,6 +1298,15 @@ void TestHashJoinDictionaryHelper(
}
}
+ // Instead of sending 2 batches of size 0 we should not send any batches
+ // at all to more accurately simulate real world use cases
+ if (l_length == 0) {
+ l_batches.batches.resize(0);
+ }
+ if (r_length == 0) {
+ r_batches.batches.resize(0);
+ }
+
auto exec_ctx = arrow::internal::make_unique<ExecContext>(
default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() :
nullptr);
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));