save-buffer commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r944977646


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
         /*concurrent_tasks=*/2 * num_threads, sync_execution));
 
     started_ = true;
-    // producers precede consumers
-    sorted_nodes_ = TopoSort();
-
-    Status st = Status::OK();
-
-    using rev_it = std::reverse_iterator<NodeVector::iterator>;
-    for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != 
end; ++it) {
-      auto node = *it;
-
-      EVENT(span_, "StartProducing:" + node->label(),
-            {{"node.label", node->label()}, {"node.kind_name", 
node->kind_name()}});
-      st = node->StartProducing();
-      EVENT(span_, "StartProducing:" + node->label(), {{"status", 
st.ToString()}});
+    for (std::unique_ptr<ExecNode>& n : nodes_) {
+      Status st = n->StartProducing();
       if (!st.ok()) {
-        // Stop nodes that successfully started, in reverse order
-        stopped_ = true;
-        StopProducingImpl(it.base(), sorted_nodes_.end());
-        for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != 
it.base();
-             ++fw_it) {
-          Future<> fut = (*fw_it)->finished();
-          if (!fut.is_finished()) fut.MarkFinished();
-        }
+        Abort();
         return st;
       }
     }
-    return st;
+    // StartProducing will have added some tasks to the task group.
+    // Now we end the task group so that as soon as we run out of tasks,
+    // we've finished executing.
+    EndTaskGroup();
+    return Status::OK();
   }
 
   void EndTaskGroup() {
     bool expected = false;
     if (group_ended_.compare_exchange_strong(expected, true)) {
       task_group_.End().AddCallback([this](const Status& st) {
-        MARK_SPAN(span_, error_st_ & st);
-        END_SPAN(span_);
-        finished_.MarkFinished(error_st_ & st);
+        if (aborted_) {
+          for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort();
+        }
+        if (!errors_.empty())
+          finished_.MarkFinished(errors_[0]);

Review Comment:
   I was thinking it would mainly be for debugging - in theory this will have 
at most `num_threads` errors in it, so you'd be able to see which threads threw 
what error. Though this isn't a problem yet so maybe it's not worth solving. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to