bkietz commented on a change in pull request #10863:
URL: https://github.com/apache/arrow/pull/10863#discussion_r686968785



##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -262,6 +263,7 @@ BatchesWithSchema MakeBasicBatches() {
 BatchesWithSchema MakeRandomBatches(const std::shared_ptr<Schema>& schema,
                                     int num_batches = 10, int batch_size = 4) {
   BatchesWithSchema out;
+  out.schema = schema;

Review comment:
       :+1: 

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -355,6 +387,52 @@ TEST(ExecPlanExecution, StressSourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, StressSourceOrderBy) {
+  auto input_schema = schema({field("a", int32()), field("b", boolean())});
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = slow && !parallel ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto random_data = MakeRandomBatches(input_schema, num_batches);
+
+      SortOptions options({SortKey("a", SortOrder::Ascending)});
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{random_data.schema,
+                                                     random_data.gen(parallel, 
slow)}},
+                        {"order_by_sink", OrderBySinkNodeOptions{options, 
&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      // Check that data is sorted appropriately
+      ASSERT_FINISHES_OK_AND_ASSIGN(auto exec_batches,
+                                    StartAndCollect(plan.get(), sink_gen));
+      RecordBatchVector batches, original_batches;
+      for (const auto& batch : exec_batches) {

Review comment:
       I think it'd be worthwhile to extract `TableFromExecBatches` into 
exec/util.h

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -111,5 +112,20 @@ class ARROW_EXPORT SinkNodeOptions : public 
ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()>* generator;
 };
 
+/// \brief Make a node which sorts rows passed through it
+///
+/// All batches pushed to this node will be accumulated, then sorted, by the 
given
+/// fields. Then sorted batches will be pushed to the next node, along a tag
+/// indicating the absolute order of the batches.

Review comment:
       I think this comment needs to be refactored

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -148,7 +151,89 @@ class SinkNode : public ExecNode {
   PushGenerator<util::optional<ExecBatch>>::Producer producer_;
 };
 
+// A sink node that accumulates inputs, then sorts them before emitting them.
+struct OrderBySinkNode final : public SinkNode {
+  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions 
sort_options,
+                  AsyncGenerator<util::optional<ExecBatch>>* generator)
+      : SinkNode(plan, std::move(inputs), generator),
+        sort_options_(std::move(sort_options)) {}
+
+  const char* kind_name() override { return "OrderBySinkNode"; }
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+    const auto& sink_options = checked_cast<const 
OrderBySinkNodeOptions&>(options);
+    return plan->EmplaceNode<OrderBySinkNode>(
+        plan, std::move(inputs), sink_options.sort_options, 
sink_options.generator);
+  }
+
+  void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+    DCHECK_EQ(input, inputs_[0]);
+
+    // Accumulate data
+    {
+      std::unique_lock<std::mutex> lock(mutex_);
+      auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+                                             
plan()->exec_context()->memory_pool());
+      if (ErrorIfNotOk(maybe_batch.status())) return;
+      batches_.push_back(maybe_batch.MoveValueUnsafe());
+    }
+
+    if (input_counter_.Increment()) {
+      Finish();
+    }
+  }
+
+ protected:
+  Result<std::shared_ptr<Table>> SortData() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ARROW_ASSIGN_OR_RAISE(
+        auto table,
+        Table::FromRecordBatches(inputs_[0]->output_schema(), 
std::move(batches_)));
+    ARROW_ASSIGN_OR_RAISE(auto indices,
+                          SortIndices(table, sort_options_, 
plan()->exec_context()));
+    ARROW_ASSIGN_OR_RAISE(auto sorted, Take(table, indices, 
TakeOptions::NoBoundsCheck(),
+                                            plan()->exec_context()));
+    return sorted.table();
+  }
+
+  void Finish() override {
+    auto maybe_sorted = SortData();

Review comment:
       Nit: this would be more readable as
   ```suggestion
       Status st = DoFinish();
       if (ErrorIfNotOk(st)) {
         producer_.Push(std::move(st));
       }
       SinkNode::Finish();
     }
     
     Status DoFinish() {
       ARROW_ASSIGN_OR_RAISE(auto sorted, SortData());
       //...
   ```

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -301,6 +303,36 @@ TEST(ExecPlanExecution, SourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, SourceOrderBy) {
+  std::vector<ExecBatch> expected = {
+      ExecBatchFromJSON({int32(), boolean()},
+                        "[[4, false], [5, null], [6, false], [7, false], 
[null, true]]")};
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto basic_data = MakeBasicBatches();
+
+      SortOptions options({SortKey("i32", SortOrder::Ascending)});
+      ASSERT_OK(Declaration::Sequence(
+                    {
+                        {"source", SourceNodeOptions{basic_data.schema,
+                                                     basic_data.gen(parallel, 
slow)}},
+                        {"order_by_sink", OrderBySinkNodeOptions{options, 
&sink_gen}},
+                    })
+                    .AddToPlan(plan.get()));
+
+      ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                  Finishes(ResultWith(::testing::ElementsAreArray(expected))));

Review comment:
       Nit: `using` this above please
   ```suggestion
                     Finishes(ResultWith(ElementsAreArray(expected))));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to