aocsa commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r717897778



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +290,35 @@ bool ExecNode::ErrorIfNotOk(Status status) {
   return true;
 }
 
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+  if (finished_.is_finished()) {
+    return Status::OK();
+  }
+  if (this->has_executor()) {
+    DCHECK(task_group_ != nullptr);
+    task_group_->Append(std::move(task));
+  } else {
+    std::move(task)();
+  }
+  if (batch_count_.Increment()) {
+    this->MarkFinished();
+  }
+  return Status::OK();
+}
+
+void ExecNode::MarkFinished(bool request_stop) {
+  if (this->has_executor()) {
+    if (request_stop) {
+      this->stop_source_.RequestStop();
+    }
+    task_group_->FinishAsync().AddCallback([this](const Status& status) {
+      if (!this->finished_.is_finished()) this->finished_.MarkFinished(status);

Review comment:
       Based on my test and profiling test, yes this is necessary. The first 
call to this functions could when `batch_count_` reach the total_batches, but 
at the same time ExecNode::StopProducing signal could happens. So only once 
it's neccesary to mark as Finished  `finished_`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to