save-buffer commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r944976331
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
return st;
}
}
- return st;
+ // StartProducing will have added some tasks to the task group.
+ // Now we end the task group so that as soon as we run out of tasks,
+ // we've finished executing.
+ EndTaskGroup();
Review Comment:
EndTaskGroup has a nice property that it ends when it runs out of tasks to
perform, here's the comment:
```
/// It is allowed for tasks to be added after this call provided the
future has not yet
/// completed. This should be safe as long as the tasks being added are
added as part
/// of a task that is tracked. As soon as the count of running tasks
reaches 0 this
/// future will be marked complete.
```
So we will end when all of the tasks have finished running and no new tasks
have been scheduled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]