lidavidm commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r717838373



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
+  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+                   StopCallback&&) override;

Review comment:
       Do we still need to make this public?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +290,35 @@ bool ExecNode::ErrorIfNotOk(Status status) {
   return true;
 }
 
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+  if (finished_.is_finished()) {
+    return Status::OK();
+  }
+  if (this->has_executor()) {
+    DCHECK(task_group_ != nullptr);
+    task_group_->Append(std::move(task));
+  } else {
+    std::move(task)();
+  }
+  if (batch_count_.Increment()) {
+    this->MarkFinished();
+  }
+  return Status::OK();
+}
+
+void ExecNode::MarkFinished(bool request_stop) {
+  if (this->has_executor()) {
+    if (request_stop) {
+      this->stop_source_.RequestStop();
+    }
+    task_group_->FinishAsync().AddCallback([this](const Status& status) {
+      if (!this->finished_.is_finished()) this->finished_.MarkFinished(status);

Review comment:
       This is racy/this is a TOC-TOU error.

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -127,21 +131,24 @@ class FilterNode : public ExecNode {
     StopProducing();
   }
 
-  void StopProducing() override { inputs_[0]->StopProducing(this); }
+  void StopProducing() override {
+    if (batch_count_.Cancel()) {
+      this->MarkFinished(/*request_stop=*/true);
+    }
+    inputs_[0]->StopProducing(this);
+  }
 
-  Future<> finished() override { return inputs_[0]->finished(); }
+  Future<> finished() override { return finished_; }

Review comment:
       (And in general, if we're codifying patterns like `AtomicCounter 
batch_count_`, I think more of these boilerplate methods could be implemented 
in the base class instead, once we're OK with the overall design in this PR.)

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +290,35 @@ bool ExecNode::ErrorIfNotOk(Status status) {
   return true;
 }
 
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+  if (finished_.is_finished()) {
+    return Status::OK();
+  }
+  if (this->has_executor()) {
+    DCHECK(task_group_ != nullptr);
+    task_group_->Append(std::move(task));
+  } else {
+    std::move(task)();
+  }
+  if (batch_count_.Increment()) {
+    this->MarkFinished();
+  }
+  return Status::OK();
+}
+
+void ExecNode::MarkFinished(bool request_stop) {
+  if (this->has_executor()) {
+    if (request_stop) {
+      this->stop_source_.RequestStop();
+    }
+    task_group_->FinishAsync().AddCallback([this](const Status& status) {
+      if (!this->finished_.is_finished()) this->finished_.MarkFinished(status);

Review comment:
       So long as this is only called after checking AtomicCounter, is the 
check here necessary?

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -127,21 +131,24 @@ class FilterNode : public ExecNode {
     StopProducing();
   }
 
-  void StopProducing() override { inputs_[0]->StopProducing(this); }
+  void StopProducing() override {
+    if (batch_count_.Cancel()) {
+      this->MarkFinished(/*request_stop=*/true);
+    }
+    inputs_[0]->StopProducing(this);
+  }
 
-  Future<> finished() override { return inputs_[0]->finished(); }
+  Future<> finished() override { return finished_; }

Review comment:
       Maybe this should be the default implementation in the base class now? 
(Though I guess for this PR that requires more refactoring.)

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -221,6 +228,18 @@ class ARROW_EXPORT ExecNode {
 
   std::string ToString() const;
 
+  /// \brief Is an executor available?
+  bool has_executor() { return plan()->exec_context()->executor() != nullptr; }

Review comment:
       I think all three new methods should be protected (also has_executor can 
be const, no?)

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -241,6 +260,15 @@ class ARROW_EXPORT ExecNode {
   std::shared_ptr<Schema> output_schema_;
   int num_outputs_;
   NodeVector outputs_;
+
+  // Counter for the number of batches received
+  AtomicCounter batch_count_;
+  // Future to sync finished
+  Future<> finished_ = Future<>::MakeFinished();

Review comment:
       No need to initialize here if we're also initializing in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to