This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd44a33229 GH-39045: [C++][Acero] union node output batches should be 
unordered (#39046)
fd44a33229 is described below

commit fd44a332290229d1dfff5a0f599d82fdc8e276bb
Author: Yue <[email protected]>
AuthorDate: Wed Dec 6 03:10:43 2023 +0800

    GH-39045: [C++][Acero] union node output batches should be unordered 
(#39046)
    
    ### Rationale for this change
    Acero's union node produce duplicated batch index if having multiple 
ordered input nodes, which makes down stream nodes unable to process these 
batches if ordering is a concern. This PR tries to address this issue.
    
    ### What changes are included in this PR?
    This PR fixes this issue by setting the index to unsequenced if the order 
cannot be guaranteed.
    
    ### Are these changes tested?
    Yes
    
    ### Are there any user-facing changes?
    No
    * Closes: #39045
    
    Authored-by: Yue Ni <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/acero/union_node.cc      |  3 +++
 cpp/src/arrow/acero/union_node_test.cc | 12 ++++++++++--
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/acero/union_node.cc 
b/cpp/src/arrow/acero/union_node.cc
index 054fcdaba2..dc3ee102d4 100644
--- a/cpp/src/arrow/acero/union_node.cc
+++ b/cpp/src/arrow/acero/union_node.cc
@@ -80,6 +80,9 @@ class UnionNode : public ExecNode, public TracedNode {
     NoteInputReceived(batch);
     ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != 
inputs_.end());
 
+    if (inputs_.size() > 1) {
+      batch.index = compute::kUnsequencedIndex;
+    }
     return output_->InputReceived(this, std::move(batch));
   }
 
diff --git a/cpp/src/arrow/acero/union_node_test.cc 
b/cpp/src/arrow/acero/union_node_test.cc
index 8c07ece939..d925ac378e 100644
--- a/cpp/src/arrow/acero/union_node_test.cc
+++ b/cpp/src/arrow/acero/union_node_test.cc
@@ -63,8 +63,9 @@ struct TestUnionNode : public ::testing::Test {
       out_batches->batches.push_back(empty_record_batch);
     } else {
       for (size_t j = 0; j < num_batches; j++) {
-        out_batches->batches.push_back(
-            ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
+        auto out_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 
batch_size));
+        out_batch.index = j;
+        out_batches->batches.push_back(std::move(out_batch));
       }
     }
 
@@ -108,6 +109,13 @@ struct TestUnionNode : public ::testing::Test {
     auto expected_matcher =
         Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)));
     ASSERT_THAT(actual, expected_matcher);
+
+    // union node with multiple inputs should produce unordered batches
+    if (batches.size() > 1) {
+      for (const auto& batch : *actual.result()) {
+        ASSERT_EQ(batch.index, compute::kUnsequencedIndex);
+      }
+    }
   }
 
   void CheckUnionExecNode(size_t num_input_nodes, size_t num_batches, bool 
parallel) {

Reply via email to