save-buffer commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r944977316


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
         /*concurrent_tasks=*/2 * num_threads, sync_execution));
 
     started_ = true;
-    // producers precede consumers
-    sorted_nodes_ = TopoSort();
-
-    Status st = Status::OK();
-
-    using rev_it = std::reverse_iterator<NodeVector::iterator>;
-    for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != 
end; ++it) {
-      auto node = *it;
-
-      EVENT(span_, "StartProducing:" + node->label(),
-            {{"node.label", node->label()}, {"node.kind_name", 
node->kind_name()}});
-      st = node->StartProducing();
-      EVENT(span_, "StartProducing:" + node->label(), {{"status", 
st.ToString()}});
+    for (std::unique_ptr<ExecNode>& n : nodes_) {
+      Status st = n->StartProducing();
       if (!st.ok()) {
-        // Stop nodes that successfully started, in reverse order
-        stopped_ = true;
-        StopProducingImpl(it.base(), sorted_nodes_.end());
-        for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != 
it.base();
-             ++fw_it) {
-          Future<> fut = (*fw_it)->finished();
-          if (!fut.is_finished()) fut.MarkFinished();
-        }
+        Abort();
         return st;
       }
     }
-    return st;
+    // StartProducing will have added some tasks to the task group.
+    // Now we end the task group so that as soon as we run out of tasks,
+    // we've finished executing.
+    EndTaskGroup();
+    return Status::OK();
   }
 
   void EndTaskGroup() {
     bool expected = false;
     if (group_ended_.compare_exchange_strong(expected, true)) {
       task_group_.End().AddCallback([this](const Status& st) {
-        MARK_SPAN(span_, error_st_ & st);
-        END_SPAN(span_);
-        finished_.MarkFinished(error_st_ & st);
+        if (aborted_) {
+          for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort();
+        }

Review Comment:
   We want to avoid any possible race conditions while aborting/doing cleanup 
and running tasks, so it's only safe to Abort when we're sure that no other 
tasks are running. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to