This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new fd44a33229 GH-39045: [C++][Acero] union node output batches should be
unordered (#39046)
fd44a33229 is described below
commit fd44a332290229d1dfff5a0f599d82fdc8e276bb
Author: Yue <[email protected]>
AuthorDate: Wed Dec 6 03:10:43 2023 +0800
GH-39045: [C++][Acero] union node output batches should be unordered
(#39046)
### Rationale for this change
Acero's union node produce duplicated batch index if having multiple
ordered input nodes, which makes down stream nodes unable to process these
batches if ordering is a concern. This PR tries to address this issue.
### What changes are included in this PR?
This PR fixes this issue by setting the index to unsequenced if the order
cannot be guaranteed.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
* Closes: #39045
Authored-by: Yue Ni <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/acero/union_node.cc | 3 +++
cpp/src/arrow/acero/union_node_test.cc | 12 ++++++++++--
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/acero/union_node.cc
b/cpp/src/arrow/acero/union_node.cc
index 054fcdaba2..dc3ee102d4 100644
--- a/cpp/src/arrow/acero/union_node.cc
+++ b/cpp/src/arrow/acero/union_node.cc
@@ -80,6 +80,9 @@ class UnionNode : public ExecNode, public TracedNode {
NoteInputReceived(batch);
ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) !=
inputs_.end());
+ if (inputs_.size() > 1) {
+ batch.index = compute::kUnsequencedIndex;
+ }
return output_->InputReceived(this, std::move(batch));
}
diff --git a/cpp/src/arrow/acero/union_node_test.cc
b/cpp/src/arrow/acero/union_node_test.cc
index 8c07ece939..d925ac378e 100644
--- a/cpp/src/arrow/acero/union_node_test.cc
+++ b/cpp/src/arrow/acero/union_node_test.cc
@@ -63,8 +63,9 @@ struct TestUnionNode : public ::testing::Test {
out_batches->batches.push_back(empty_record_batch);
} else {
for (size_t j = 0; j < num_batches; j++) {
- out_batches->batches.push_back(
- ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+ auto out_batch = ExecBatch(*rng_.BatchOf(schema->fields(),
batch_size));
+ out_batch.index = j;
+ out_batches->batches.push_back(std::move(out_batch));
}
}
@@ -108,6 +109,13 @@ struct TestUnionNode : public ::testing::Test {
auto expected_matcher =
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
ASSERT_THAT(actual, expected_matcher);
+
+ // union node with multiple inputs should produce unordered batches
+ if (batches.size() > 1) {
+ for (const auto& batch : *actual.result()) {
+ ASSERT_EQ(batch.index, compute::kUnsequencedIndex);
+ }
+ }
}
void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool
parallel) {