westonpace commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r944016109
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -52,29 +55,27 @@ struct ExecPlanImpl : public ExecPlan {
~ExecPlanImpl() override {
if (started_ && !finished_.is_finished()) {
ARROW_LOG(WARNING) << "Plan was destroyed before finishing";
- StopProducing();
+ Abort();
finished().Wait();
}
}
size_t GetThreadIndex() { return thread_indexer_(); }
size_t max_concurrency() const { return thread_indexer_.Capacity(); }
+ const std::vector<std::unique_ptr<ExecNode>>& nodes() const { return nodes_;
}
ExecNode* AddNode(std::unique_ptr<ExecNode> node) {
if (node->label().empty()) {
node->SetLabel(std::to_string(auto_label_counter_++));
}
- if (node->num_inputs() == 0) {
- sources_.push_back(node.get());
- }
- if (node->num_outputs() == 0) {
- sinks_.push_back(node.get());
- }
nodes_.push_back(std::move(node));
return nodes_.back().get();
}
Result<Future<>> BeginExternalTask() {
+ // The task group isn't relevant in synchronous execution mode
+ if (!exec_context_->executor()) return Future<>::Make();
+
Review Comment:
Eventually this case will go away
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
Review Comment:
Why does the order no longer matter here?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
return st;
}
}
- return st;
+ // StartProducing will have added some tasks to the task group.
+ // Now we end the task group so that as soon as we run out of tasks,
+ // we've finished executing.
+ EndTaskGroup();
Review Comment:
Wouldn't a more appropriate place to trigger `EndTaskGroup` be when
`InputFinished` is received on all sinks?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
return st;
}
}
- return st;
+ // StartProducing will have added some tasks to the task group.
+ // Now we end the task group so that as soon as we run out of tasks,
+ // we've finished executing.
+ EndTaskGroup();
+ return Status::OK();
}
void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
task_group_.End().AddCallback([this](const Status& st) {
- MARK_SPAN(span_, error_st_ & st);
- END_SPAN(span_);
- finished_.MarkFinished(error_st_ & st);
+ if (aborted_) {
+ for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort();
+ }
Review Comment:
Wouldn't we call `node->Abort` when we transition to `aborted_ = true`?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
return st;
}
}
- return st;
+ // StartProducing will have added some tasks to the task group.
+ // Now we end the task group so that as soon as we run out of tasks,
+ // we've finished executing.
+ EndTaskGroup();
+ return Status::OK();
}
void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
task_group_.End().AddCallback([this](const Status& st) {
- MARK_SPAN(span_, error_st_ & st);
- END_SPAN(span_);
- finished_.MarkFinished(error_st_ & st);
+ if (aborted_) {
+ for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort();
+ }
+ if (!errors_.empty())
+ finished_.MarkFinished(errors_[0]);
Review Comment:
Seems like a lot of work to collect all the errors only to emit the first
one. Is there another reason you want to collect all errors?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+// COMMIT cd5346e14450d7e5ca156acb4c2f396885c77aa0
+
Review Comment:
```suggestion
```
##########
cpp/src/arrow/compute/exec/project_node.cc:
##########
@@ -91,26 +95,34 @@ class ProjectNode : public MapNode {
ARROW_ASSIGN_OR_RAISE(values[i],
ExecuteScalarExpression(simplified_expr, target,
plan()->exec_context()));
}
+ END_SPAN(span);
+
return ExecBatch{std::move(values), target.length};
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
- EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
+ Status StartProducing() override { return Status::OK(); }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->PauseProducing(this, counter);
+ }
+
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->ResumeProducing(this, counter);
+ }
+
+ Status InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- auto func = [this](ExecBatch batch) {
- util::tracing::Span span;
- START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
- {{"project", ToStringExtra()},
- {"node.label", label()},
- {"batch.length", batch.length}});
- auto result = DoProject(std::move(batch));
- MARK_SPAN(span, result.status());
- END_SPAN(span);
- return result;
- };
- this->SubmitTask(std::move(func), std::move(batch));
+ ARROW_ASSIGN_OR_RAISE(ExecBatch projected, DoProject(std::move(batch)));
+ return output_->InputReceived(this, std::move(projected));
}
+ Status InputFinished(ExecNode* input, int num_batches) override {
+ END_SPAN(span_);
+ return output_->InputFinished(this, num_batches);
+ }
Review Comment:
Can this be a default implementation for `ExecNode::InputFinished`?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -404,38 +320,36 @@ std::string ExecPlan::ToString() const { return
ToDerived(this)->ToString(); }
ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs,
std::vector<std::string> input_labels,
- std::shared_ptr<Schema> output_schema, int num_outputs)
+ std::shared_ptr<Schema> output_schema)
: plan_(plan),
inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_schema_(std::move(output_schema)),
- num_outputs_(num_outputs) {
+ output_(nullptr) {
for (auto input : inputs_) {
- input->outputs_.push_back(this);
+ input->output_ = this;
}
}
-Status ExecNode::Init() { return Status::OK(); }
+Status ExecNode::Init() {
+ START_COMPUTE_SPAN(
+ span_, std::string(kind_name()) + ":" + label(),
+ {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind",
kind_name()}});
+ return Status::OK();
+}
Review Comment:
Very happy to see this move into the base class.
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -166,162 +169,74 @@ struct ExecPlanImpl : public ExecPlan {
/*concurrent_tasks=*/2 * num_threads, sync_execution));
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
-
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
+ for (std::unique_ptr<ExecNode>& n : nodes_) {
+ Status st = n->StartProducing();
if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
+ Abort();
return st;
}
}
- return st;
+ // StartProducing will have added some tasks to the task group.
+ // Now we end the task group so that as soon as we run out of tasks,
+ // we've finished executing.
+ EndTaskGroup();
+ return Status::OK();
}
void EndTaskGroup() {
bool expected = false;
if (group_ended_.compare_exchange_strong(expected, true)) {
task_group_.End().AddCallback([this](const Status& st) {
- MARK_SPAN(span_, error_st_ & st);
- END_SPAN(span_);
- finished_.MarkFinished(error_st_ & st);
+ if (aborted_) {
+ for (std::unique_ptr<ExecNode>& node : nodes_) node->Abort();
+ }
+ if (!errors_.empty())
+ finished_.MarkFinished(errors_[0]);
+ else
+ finished_.MarkFinished(st);
});
}
}
- void StopProducing() {
- DCHECK(started_) << "stopped an ExecPlan which never started";
- EVENT(span_, "StopProducing");
- stopped_ = true;
- task_scheduler_->Abort(
- [this]() { StopProducingImpl(sorted_nodes_.begin(),
sorted_nodes_.end()); });
+ void Abort() {
+ DCHECK(started_) << "aborted an ExecPlan which never started";
+ EVENT(span_, "Abort");
+ if (finished_.is_finished()) return;
+ std::lock_guard<std::mutex> guard(abort_mutex_);
+ AbortUnlocked();
}
- template <typename It>
- void StopProducingImpl(It begin, It end) {
- for (auto it = begin; it != end; ++it) {
- auto node = *it;
- EVENT(span_, "StopProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- node->StopProducing();
+ void AbortUnlocked() {
+ if (!aborted_) {
+ aborted_ = true;
+ stop_source_.RequestStop();
+ EndTaskGroup();
+ task_scheduler_->Abort([]() {});
}
}
- NodeVector TopoSort() const {
- struct Impl {
- const std::vector<std::unique_ptr<ExecNode>>& nodes;
- std::unordered_set<ExecNode*> visited;
- NodeVector sorted;
-
- explicit Impl(const std::vector<std::unique_ptr<ExecNode>>& nodes) :
nodes(nodes) {
- visited.reserve(nodes.size());
- sorted.resize(nodes.size());
-
- for (const auto& node : nodes) {
- Visit(node.get());
- }
-
- DCHECK_EQ(visited.size(), nodes.size());
- }
-
- void Visit(ExecNode* node) {
- if (visited.count(node) != 0) return;
-
- for (auto input : node->inputs()) {
- // Ensure that producers are inserted before this consumer
- Visit(input);
- }
-
- sorted[visited.size()] = node;
- visited.insert(node);
- }
- };
-
- return std::move(Impl{nodes_}.sorted);
- }
-
- // This function returns a node vector and a vector of integers with the
- // number of spaces to add as an indentation. The main difference between
- // this function and the TopoSort function is that here we visit the nodes
- // in reverse order and we can have repeated nodes if necessary.
- // For example, in the following plan:
- // s1 --> s3 -
- // - -
- // - -> s5 --> s6
- // - -
- // s2 --> s4 -
- // Toposort node vector: s1 s2 s3 s4 s5 s6
- // OrderedNodes node vector: s6 s5 s3 s1 s4 s2 s1
- std::pair<NodeVector, std::vector<int>> OrderedNodes() const {
- struct Impl {
- const std::vector<std::unique_ptr<ExecNode>>& nodes;
- std::unordered_set<ExecNode*> visited;
- std::unordered_set<ExecNode*> marked;
- NodeVector sorted;
- std::vector<int> indents;
-
- explicit Impl(const std::vector<std::unique_ptr<ExecNode>>& nodes) :
nodes(nodes) {
- visited.reserve(nodes.size());
-
- for (auto it = nodes.rbegin(); it != nodes.rend(); ++it) {
- if (visited.count(it->get()) != 0) continue;
- Visit(it->get());
- }
-
- DCHECK_EQ(visited.size(), nodes.size());
- }
-
- void Visit(ExecNode* node, int indent = 0) {
- marked.insert(node);
- for (auto input : node->inputs()) {
- if (marked.count(input) != 0) continue;
- Visit(input, indent + 1);
- }
- marked.erase(node);
-
- indents.push_back(indent);
- sorted.push_back(node);
- visited.insert(node);
- }
- };
-
- auto result = Impl{nodes_};
- return std::make_pair(result.sorted, result.indents);
- }
-
std::string ToString() const {
std::stringstream ss;
ss << "ExecPlan with " << nodes_.size() << " nodes:" << std::endl;
- auto sorted = OrderedNodes();
- for (size_t i = sorted.first.size(); i > 0; --i) {
- for (int j = 0; j < sorted.second[i - 1]; ++j) ss << " ";
- ss << sorted.first[i - 1]->ToString(sorted.second[i - 1]) << std::endl;
+ for (const std::unique_ptr<ExecNode>& node : nodes_) {
+ if (!node->output()) {
+ PrintSubtree(node.get(), ss, /*indent=*/0);
+ }
Review Comment:
Can we keep the node ordering around just for the sake of printing?
##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -131,11 +127,7 @@ class ARROW_EXPORT ExecPlan : public
std::enable_shared_from_this<ExecPlan> {
/// is started before all of its inputs.
Status StartProducing();
- /// \brief Stop producing on all nodes
- ///
- /// Nodes are stopped in topological order, such that any node
- /// is stopped before all of its outputs.
- void StopProducing();
+ void Abort();
Review Comment:
Can we keep the comment?
##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -323,27 +311,14 @@ class ARROW_EXPORT ExecNode {
/// This may be called any number of times after StartProducing() succeeds.
virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;
- /// \brief Stop producing definitively to a single output
- ///
- /// This call is a hint that an output node has completed and is not willing
- /// to receive any further data.
- virtual void StopProducing(ExecNode* output) = 0;
-
- /// \brief Stop producing definitively to all outputs
- virtual void StopProducing() = 0;
-
- /// \brief A future which will be marked finished when this node has stopped
producing.
- virtual Future<> finished() { return finished_; }
+ /// \brief Abort execution and perform any needed cleanup (such as closing
files, etc.)
Review Comment:
What does `Abort execution` mean for a node? In theory all "execution" is
handled via the scheduler so does a node really need to do anything here? Why
`ExecNode::Abort` instead of doing the cleanup in the `ExecNode` destructor?
##########
cpp/src/arrow/compute/exec/project_node.cc:
##########
@@ -91,26 +95,34 @@ class ProjectNode : public MapNode {
ARROW_ASSIGN_OR_RAISE(values[i],
ExecuteScalarExpression(simplified_expr, target,
plan()->exec_context()));
}
+ END_SPAN(span);
+
return ExecBatch{std::move(values), target.length};
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
- EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
+ Status StartProducing() override { return Status::OK(); }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->PauseProducing(this, counter);
+ }
+
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->ResumeProducing(this, counter);
+ }
+
+ Status InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- auto func = [this](ExecBatch batch) {
- util::tracing::Span span;
- START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
- {{"project", ToStringExtra()},
- {"node.label", label()},
- {"batch.length", batch.length}});
- auto result = DoProject(std::move(batch));
- MARK_SPAN(span, result.status());
- END_SPAN(span);
- return result;
- };
- this->SubmitTask(std::move(func), std::move(batch));
+ ARROW_ASSIGN_OR_RAISE(ExecBatch projected, DoProject(std::move(batch)));
+ return output_->InputReceived(this, std::move(projected));
Review Comment:
At this point maybe we should just move the body of `DoProject` into this
method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]