lidavidm commented on a change in pull request #11274:
URL: https://github.com/apache/arrow/pull/11274#discussion_r719616956



##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
   PushGenerator<util::optional<ExecBatch>>::Producer producer_;
 };
 
-// A sink node that accumulates inputs, then sorts them before emitting them.
 struct OrderBySinkNode final : public SinkNode {
-  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions 
sort_options,
+  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                  std::unique_ptr<OrderByImpl> impl,
                   AsyncGenerator<util::optional<ExecBatch>>* generator)
-      : SinkNode(plan, std::move(inputs), generator),
-        sort_options_(std::move(sort_options)) {}
+      : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
 
   const char* kind_name() const override { return "OrderBySinkNode"; }
 
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
+  // A sink node that accumulates inputs, then sorts them before emitting them.
+  static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                    const ExecNodeOptions& options) {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
 
     const auto& sink_options = checked_cast<const 
OrderBySinkNodeOptions&>(options);
-    return plan->EmplaceNode<OrderBySinkNode>(
-        plan, std::move(inputs), sink_options.sort_options, 
sink_options.generator);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+                              sink_options.sort_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
+  }
+
+  // A sink node that receives inputs and then compute top_k/bottom_k.
+  static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                       const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+    const auto& sink_options = checked_cast<const 
SelectKSinkNodeOptions&>(options);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSelectK(plan->exec_context(), 
inputs[0]->output_schema(),
+                                 sink_options.select_k_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
   }
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    // Accumulate data
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-                                             
plan()->exec_context()->memory_pool());
-      if (ErrorIfNotOk(maybe_batch.status())) return;
-      batches_.push_back(maybe_batch.MoveValueUnsafe());
-    }
+    auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+                                           
plan()->exec_context()->memory_pool());
+    if (ErrorIfNotOk(maybe_batch.status())) return;
+    auto record_batch = maybe_batch.MoveValueUnsafe();
 
+    Status status = impl_->InputReceived(std::move(record_batch));
+    if (!status.ok()) {

Review comment:
       This should use the same pattern as above.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
   PushGenerator<util::optional<ExecBatch>>::Producer producer_;
 };
 
-// A sink node that accumulates inputs, then sorts them before emitting them.
 struct OrderBySinkNode final : public SinkNode {
-  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions 
sort_options,
+  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                  std::unique_ptr<OrderByImpl> impl,
                   AsyncGenerator<util::optional<ExecBatch>>* generator)
-      : SinkNode(plan, std::move(inputs), generator),
-        sort_options_(std::move(sort_options)) {}
+      : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
 
   const char* kind_name() const override { return "OrderBySinkNode"; }
 
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
+  // A sink node that accumulates inputs, then sorts them before emitting them.
+  static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                    const ExecNodeOptions& options) {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
 
     const auto& sink_options = checked_cast<const 
OrderBySinkNodeOptions&>(options);
-    return plan->EmplaceNode<OrderBySinkNode>(
-        plan, std::move(inputs), sink_options.sort_options, 
sink_options.generator);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+                              sink_options.sort_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
+  }
+
+  // A sink node that receives inputs and then compute top_k/bottom_k.
+  static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                       const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+    const auto& sink_options = checked_cast<const 
SelectKSinkNodeOptions&>(options);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSelectK(plan->exec_context(), 
inputs[0]->output_schema(),
+                                 sink_options.select_k_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
   }
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    // Accumulate data
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-                                             
plan()->exec_context()->memory_pool());
-      if (ErrorIfNotOk(maybe_batch.status())) return;
-      batches_.push_back(maybe_batch.MoveValueUnsafe());
-    }
+    auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+                                           
plan()->exec_context()->memory_pool());
+    if (ErrorIfNotOk(maybe_batch.status())) return;

Review comment:
       It seems a little hard to write a test case for this, however. 
Practically this can only come up if there's an allocation failure.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
   PushGenerator<util::optional<ExecBatch>>::Producer producer_;
 };
 
-// A sink node that accumulates inputs, then sorts them before emitting them.
 struct OrderBySinkNode final : public SinkNode {
-  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions 
sort_options,
+  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                  std::unique_ptr<OrderByImpl> impl,
                   AsyncGenerator<util::optional<ExecBatch>>* generator)
-      : SinkNode(plan, std::move(inputs), generator),
-        sort_options_(std::move(sort_options)) {}
+      : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
 
   const char* kind_name() const override { return "OrderBySinkNode"; }
 
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
+  // A sink node that accumulates inputs, then sorts them before emitting them.
+  static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                    const ExecNodeOptions& options) {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
 
     const auto& sink_options = checked_cast<const 
OrderBySinkNodeOptions&>(options);
-    return plan->EmplaceNode<OrderBySinkNode>(
-        plan, std::move(inputs), sink_options.sort_options, 
sink_options.generator);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+                              sink_options.sort_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
+  }
+
+  // A sink node that receives inputs and then compute top_k/bottom_k.
+  static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                       const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+    const auto& sink_options = checked_cast<const 
SelectKSinkNodeOptions&>(options);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSelectK(plan->exec_context(), 
inputs[0]->output_schema(),
+                                 sink_options.select_k_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
   }
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    // Accumulate data
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-                                             
plan()->exec_context()->memory_pool());
-      if (ErrorIfNotOk(maybe_batch.status())) return;
-      batches_.push_back(maybe_batch.MoveValueUnsafe());
-    }
+    auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+                                           
plan()->exec_context()->memory_pool());
+    if (ErrorIfNotOk(maybe_batch.status())) return;

Review comment:
       Hmm, I realize this error was in the original impl, but ErrorIfNotOk 
here basically just throws away the error since there's no outputs. Instead, we 
should do the same thing as Finish.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -131,54 +132,64 @@ class SinkNode : public ExecNode {
   PushGenerator<util::optional<ExecBatch>>::Producer producer_;
 };
 
-// A sink node that accumulates inputs, then sorts them before emitting them.
 struct OrderBySinkNode final : public SinkNode {
-  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs, SortOptions 
sort_options,
+  OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                  std::unique_ptr<OrderByImpl> impl,
                   AsyncGenerator<util::optional<ExecBatch>>* generator)
-      : SinkNode(plan, std::move(inputs), generator),
-        sort_options_(std::move(sort_options)) {}
+      : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
 
   const char* kind_name() const override { return "OrderBySinkNode"; }
 
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
+  // A sink node that accumulates inputs, then sorts them before emitting them.
+  static Result<ExecNode*> MakeSort(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                    const ExecNodeOptions& options) {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
 
     const auto& sink_options = checked_cast<const 
OrderBySinkNodeOptions&>(options);
-    return plan->EmplaceNode<OrderBySinkNode>(
-        plan, std::move(inputs), sink_options.sort_options, 
sink_options.generator);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
+                              sink_options.sort_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
+  }
+
+  // A sink node that receives inputs and then compute top_k/bottom_k.
+  static Result<ExecNode*> MakeSelectK(ExecPlan* plan, std::vector<ExecNode*> 
inputs,
+                                       const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "OrderBySinkNode"));
+
+    const auto& sink_options = checked_cast<const 
SelectKSinkNodeOptions&>(options);
+    ARROW_ASSIGN_OR_RAISE(
+        std::unique_ptr<OrderByImpl> impl,
+        OrderByImpl::MakeSelectK(plan->exec_context(), 
inputs[0]->output_schema(),
+                                 sink_options.select_k_options));
+    return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), 
std::move(impl),
+                                              sink_options.generator);
   }
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
-    // Accumulate data
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-                                             
plan()->exec_context()->memory_pool());
-      if (ErrorIfNotOk(maybe_batch.status())) return;
-      batches_.push_back(maybe_batch.MoveValueUnsafe());
-    }
+    auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
+                                           
plan()->exec_context()->memory_pool());
+    if (ErrorIfNotOk(maybe_batch.status())) return;

Review comment:
       ```suggestion
       if (ErrorIfNotOk(maybe_batch.status())) {
         StopProducing();
         bool cancelled = input_counter_.Cancel();
         DCHECK(cancelled);
         finished_.MarkFinished(maybe_batch.status());
         return;
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to