This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0aa622c52f GH-45196: [C++][Acero] Small refinement to hash join
(#45197)
0aa622c52f is described below
commit 0aa622c52f97f76ffaccbd677b5ff7c36ff2f1a1
Author: Rossi Sun <[email protected]>
AuthorDate: Wed Jan 8 12:47:52 2025 +0800
GH-45196: [C++][Acero] Small refinement to hash join (#45197)
### Rationale for this change
See #45196
### What changes are included in this PR?
Refine/simplify the code mentioned in the issue.
### Are these changes tested?
Existing tests suffice.
### Are there any user-facing changes?
None.
* GitHub Issue: #45196
Authored-by: Rossi Sun <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/acero/hash_join.cc | 16 ++++------------
cpp/src/arrow/acero/hash_join_node.cc | 3 +++
cpp/src/arrow/acero/swiss_join.cc | 10 ++--------
3 files changed, 9 insertions(+), 20 deletions(-)
diff --git a/cpp/src/arrow/acero/hash_join.cc b/cpp/src/arrow/acero/hash_join.cc
index ddcd2a0995..1637f3bd35 100644
--- a/cpp/src/arrow/acero/hash_join.cc
+++ b/cpp/src/arrow/acero/hash_join.cc
@@ -568,20 +568,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
if (has_payload) {
InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
}
- hash_table_empty_ = true;
+ hash_table_empty_ = batches.empty();
+ RETURN_NOT_OK(dict_build_.Init(*schema_[1], hash_table_empty_ ? nullptr :
&batches[0],
+ ctx_->exec_context()));
for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
if (cancelled_) {
return Status::Cancelled("Hash join cancelled");
}
const ExecBatch& batch = batches[ibatch];
- if (batch.length == 0) {
- continue;
- } else if (hash_table_empty_) {
- hash_table_empty_ = false;
-
- RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch,
ctx_->exec_context()));
- }
+ DCHECK_GT(batch.length, 0);
int32_t num_rows_before = hash_table_keys_.num_rows();
RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
&hash_table_keys_,
ctx_->exec_context()));
@@ -595,10 +591,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
}
}
- if (hash_table_empty_) {
- RETURN_NOT_OK(dict_build_.Init(*schema_[1], nullptr,
ctx_->exec_context()));
- }
-
return Status::OK();
}
diff --git a/cpp/src/arrow/acero/hash_join_node.cc
b/cpp/src/arrow/acero/hash_join_node.cc
index 80dd163ced..f6959818c5 100644
--- a/cpp/src/arrow/acero/hash_join_node.cc
+++ b/cpp/src/arrow/acero/hash_join_node.cc
@@ -772,6 +772,9 @@ class HashJoinNode : public ExecNode, public TracedNode {
const char* kind_name() const override { return "HashJoinNode"; }
Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
+ if (batch.length == 0) {
+ return Status::OK();
+ }
std::lock_guard<std::mutex> guard(build_side_mutex_);
build_accumulator_.InsertBatch(std::move(batch));
return Status::OK();
diff --git a/cpp/src/arrow/acero/swiss_join.cc
b/cpp/src/arrow/acero/swiss_join.cc
index c88279fd54..c068eeb50f 100644
--- a/cpp/src/arrow/acero/swiss_join.cc
+++ b/cpp/src/arrow/acero/swiss_join.cc
@@ -2598,6 +2598,8 @@ class SwissJoin : public HashJoinImpl {
return Status::OK();
}
+ DCHECK_GT(build_side_batches_[batch_id].length, 0);
+
const HashJoinProjectionMaps* schema = schema_[1];
bool no_payload = hash_table_build_.no_payload();
@@ -2605,10 +2607,6 @@ class SwissJoin : public HashJoinImpl {
ARROW_ASSIGN_OR_RAISE(
input_batch, KeyPayloadFromInput(/*side=*/1,
&build_side_batches_[batch_id]));
- if (input_batch.length == 0) {
- return Status::OK();
- }
-
// Split batch into key batch and optional payload batch
//
// Input batch is key-payload batch (key columns followed by payload
@@ -2637,10 +2635,6 @@ class SwissJoin : public HashJoinImpl {
static_cast<int64_t>(thread_id), key_batch, no_payload ? nullptr :
&payload_batch,
temp_stack)));
- // Release input batch
- //
- input_batch.values.clear();
-
return Status::OK();
}