save-buffer commented on code in PR #14524:
URL: https://github.com/apache/arrow/pull/14524#discussion_r1007212828
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -127,85 +127,89 @@ struct ExecPlanImpl : public ExecPlan {
}
Status StartProducing() {
- START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
-#ifdef ARROW_WITH_OPENTELEMETRY
- if (HasMetadata()) {
- auto pairs = metadata().get()->sorted_pairs();
- opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
- ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
- std::for_each(std::begin(pairs), std::end(pairs),
- [span](std::pair<std::string, std::string> const& pair) {
- span->SetAttribute(pair.first, pair.second);
- });
- }
-#endif
if (started_) {
return Status::Invalid("restarted ExecPlan");
}
-
- std::vector<Future<>> futures;
- for (auto& n : nodes_) {
- RETURN_NOT_OK(n->Init());
- futures.push_back(n->finished());
- }
-
- AllFinished(futures).AddCallback([this](const Status& st) {
- error_st_ = st;
- EndTaskGroup();
- });
-
- task_scheduler_->RegisterEnd();
- int num_threads = 1;
- bool sync_execution = true;
- if (auto executor = exec_context()->executor()) {
- num_threads = executor->GetCapacity();
- sync_execution = false;
- }
- RETURN_NOT_OK(task_scheduler_->StartScheduling(
- 0 /* thread_index */,
- [this](std::function<Status(size_t)> fn) -> Status {
- return this->ScheduleTask(std::move(fn));
- },
- /*concurrent_tasks=*/2 * num_threads, sync_execution));
-
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
- if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
- return st;
- }
- }
- return st;
- }
-
- void EndTaskGroup() {
- bool expected = false;
- if (group_ended_.compare_exchange_strong(expected, true)) {
- async_scheduler_->End();
- async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
- MARK_SPAN(span_, error_st_ & st);
- END_SPAN(span_);
- finished_.MarkFinished(error_st_ & st);
- });
+ // We call StartProducing on each of the nodes. The source nodes should
generally
+ // start scheduling some tasks during this call.
+ //
+ // If no source node schedules any tasks (e.g. they do all their word
synchronously as
+ // part of StartProducing) then the plan may be finished before we return
from this
+ // call.
+ Future<> scheduler_finished =
+ util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler*
async_scheduler) {
+ this->async_scheduler_ = async_scheduler;
+ START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
+#ifdef ARROW_WITH_OPENTELEMETRY
+ if (HasMetadata()) {
+ auto pairs = metadata().get()->sorted_pairs();
+ opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
+ ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
+ std::for_each(std::begin(pairs), std::end(pairs),
+ [span](std::pair<std::string, std::string> const&
pair) {
+ span->SetAttribute(pair.first, pair.second);
+ });
+ }
+#endif
+ // TODO(weston) The entire concept of ExecNode::finished() will
hopefully go
Review Comment:
yep my simplification PR gets rid of those finished futures.
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -127,85 +127,89 @@ struct ExecPlanImpl : public ExecPlan {
}
Status StartProducing() {
- START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
-#ifdef ARROW_WITH_OPENTELEMETRY
- if (HasMetadata()) {
- auto pairs = metadata().get()->sorted_pairs();
- opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
- ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
- std::for_each(std::begin(pairs), std::end(pairs),
- [span](std::pair<std::string, std::string> const& pair) {
- span->SetAttribute(pair.first, pair.second);
- });
- }
-#endif
if (started_) {
return Status::Invalid("restarted ExecPlan");
}
-
- std::vector<Future<>> futures;
- for (auto& n : nodes_) {
- RETURN_NOT_OK(n->Init());
- futures.push_back(n->finished());
- }
-
- AllFinished(futures).AddCallback([this](const Status& st) {
- error_st_ = st;
- EndTaskGroup();
- });
-
- task_scheduler_->RegisterEnd();
- int num_threads = 1;
- bool sync_execution = true;
- if (auto executor = exec_context()->executor()) {
- num_threads = executor->GetCapacity();
- sync_execution = false;
- }
- RETURN_NOT_OK(task_scheduler_->StartScheduling(
- 0 /* thread_index */,
- [this](std::function<Status(size_t)> fn) -> Status {
- return this->ScheduleTask(std::move(fn));
- },
- /*concurrent_tasks=*/2 * num_threads, sync_execution));
-
started_ = true;
- // producers precede consumers
- sorted_nodes_ = TopoSort();
-
- Status st = Status::OK();
-
- using rev_it = std::reverse_iterator<NodeVector::iterator>;
- for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it !=
end; ++it) {
- auto node = *it;
- EVENT(span_, "StartProducing:" + node->label(),
- {{"node.label", node->label()}, {"node.kind_name",
node->kind_name()}});
- st = node->StartProducing();
- EVENT(span_, "StartProducing:" + node->label(), {{"status",
st.ToString()}});
- if (!st.ok()) {
- // Stop nodes that successfully started, in reverse order
- stopped_ = true;
- StopProducingImpl(it.base(), sorted_nodes_.end());
- for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it !=
it.base();
- ++fw_it) {
- Future<> fut = (*fw_it)->finished();
- if (!fut.is_finished()) fut.MarkFinished();
- }
- return st;
- }
- }
- return st;
- }
-
- void EndTaskGroup() {
- bool expected = false;
- if (group_ended_.compare_exchange_strong(expected, true)) {
- async_scheduler_->End();
- async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
- MARK_SPAN(span_, error_st_ & st);
- END_SPAN(span_);
- finished_.MarkFinished(error_st_ & st);
- });
+ // We call StartProducing on each of the nodes. The source nodes should
generally
+ // start scheduling some tasks during this call.
+ //
+ // If no source node schedules any tasks (e.g. they do all their word
synchronously as
+ // part of StartProducing) then the plan may be finished before we return
from this
+ // call.
+ Future<> scheduler_finished =
+ util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler*
async_scheduler) {
Review Comment:
Could we move this giant lambda into its own function? The extra layer of
indentation is pretty confusing
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -108,30 +108,37 @@ class AlreadyFailedScheduler : public AsyncTaskScheduler {
std::ignore = std::move(finish_callback_)(failure_reason_);
}
bool AddTask(std::unique_ptr<Task> task) override { return false; }
- void End() override {
- Status::UnknownError("Do not call End on a sub-scheduler.").Abort();
- }
- Future<> OnFinished() const override {
- Status::UnknownError(
- "You should not rely on sub-scheduler's OnFinished. Use a "
- "finished callback when creating the sub-scheduler instead")
- .Abort();
- }
- std::shared_ptr<AsyncTaskScheduler> MakeSubScheduler(
+ std::unique_ptr<AsyncTaskScheduler::Holder> MakeSubScheduler(
+ FnOnce<Status(AsyncTaskScheduler*)> initial_task,
Review Comment:
Could we guarantee that this `initial_task` gets run synchronously? I get
why it's needed (the scheduler will end itself immediately after creation
without it), but I would like the control flow for scheduling the initial tasks
to be simple. We could make this function return a `Result<unique_ptr>` in case
a node has a problem scheduling its initial tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]