aocsa commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r717897778
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +290,35 @@ bool ExecNode::ErrorIfNotOk(Status status) {
return true;
}
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+ if (finished_.is_finished()) {
+ return Status::OK();
+ }
+ if (this->has_executor()) {
+ DCHECK(task_group_ != nullptr);
+ task_group_->Append(std::move(task));
+ } else {
+ std::move(task)();
+ }
+ if (batch_count_.Increment()) {
+ this->MarkFinished();
+ }
+ return Status::OK();
+}
+
+void ExecNode::MarkFinished(bool request_stop) {
+ if (this->has_executor()) {
+ if (request_stop) {
+ this->stop_source_.RequestStop();
+ }
+ task_group_->FinishAsync().AddCallback([this](const Status& status) {
+ if (!this->finished_.is_finished()) this->finished_.MarkFinished(status);
Review comment:
Based on my test and profiling test, yes this is necessary. The first
call to this functions could when `batch_count_` reach the total_batches, but
at the same time ExecNode::StopProducing signal could happens. So only once
it's neccesary to mark as Finished `finished_`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]