westonpace commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r715528675



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +285,19 @@ bool ExecNode::ErrorIfNotOk(Status status) {
   return true;
 }
 
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+  auto executor = plan()->exec_context()->executor();
+  auto maybe_future = executor->Submit(std::move(task));
+  if (!maybe_future.ok()) {
+    outputs_[0]->ErrorReceived(this, maybe_future.status());

Review comment:
       Nit: It doesn't matter today but if this is a general method we may want 
to loop through all outputs.

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -114,6 +126,10 @@ class FilterNode : public ExecNode {
   void InputFinished(ExecNode* input, int total_batches) override {
     DCHECK_EQ(input, inputs_[0]);
     outputs_[0]->InputFinished(this, total_batches);
+    if (batch_count_.SetTotal(total_batches)) {
+      task_group_.WaitForTasksToFinish().AddCallback(
+          [this](const Status& status) { this->finished_.MarkFinished(status); 
});
+    }

Review comment:
       This also feels like code that will have to be repeated in every node.

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -98,12 +97,25 @@ class FilterNode : public ExecNode {
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
-
-    auto maybe_filtered = DoFilter(std::move(batch));
-    if (ErrorIfNotOk(maybe_filtered.status())) return;
-
-    maybe_filtered->guarantee = batch.guarantee;
-    outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+    if (finished_.is_finished()) {
+      return;
+    }
+    auto task = [this, batch]() {
+      auto maybe_filtered = DoFilter(std::move(batch));
+      if (ErrorIfNotOk(maybe_filtered.status())) return Status::OK();
+      maybe_filtered->guarantee = batch.guarantee;
+      outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+      return Status::OK();
+    };
+    if (this->has_executor()) {
+      DCHECK_OK(this->SubmitTask(task));
+    } else {
+      DCHECK_OK(task());
+    }
+    if (batch_count_.Increment()) {
+      task_group_.WaitForTasksToFinish().AddCallback(
+          [this](const Status& status) { this->finished_.MarkFinished(status); 
});
+    }

Review comment:
       Could this be part of SubmitTask so that this if/else loop doesn't have 
to belong in every node?
   

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -98,12 +97,25 @@ class FilterNode : public ExecNode {
 
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
-
-    auto maybe_filtered = DoFilter(std::move(batch));
-    if (ErrorIfNotOk(maybe_filtered.status())) return;
-
-    maybe_filtered->guarantee = batch.guarantee;
-    outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+    if (finished_.is_finished()) {
+      return;
+    }

Review comment:
       Could this be part of SubmitTask?

##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -127,21 +143,25 @@ class FilterNode : public ExecNode {
     StopProducing();
   }
 
-  void StopProducing() override { inputs_[0]->StopProducing(this); }
+  void StopProducing() override {
+    if (batch_count_.Cancel()) {
+      task_group_.WaitForTasksToFinish().AddCallback(
+          [this](const Status& status) { this->finished_.MarkFinished(status); 
});
+    }

Review comment:
       This code feels like it will need to be repeated

##########
File path: cpp/src/arrow/util/task_scheduler.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <type_traits>
+#include <utility>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/cancel.h"
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/visibility.h"
+
+#if defined(_MSC_VER)
+// Disable harmless warning for decorated name length limit
+#pragma warning(disable : 4503)
+#endif
+
+namespace arrow {
+namespace internal {
+
+class ARROW_EXPORT TaskScheduler : public Executor {
+ public:
+  static Result<std::shared_ptr<TaskScheduler>> Make(int threads);
+  static Result<std::shared_ptr<TaskScheduler>> MakeEternal(int threads);
+
+  ~TaskScheduler() override;
+
+  int GetCapacity() override;
+
+  bool OwnsThisThread() override;
+
+  std::shared_ptr<ThreadPool> pool() { return thread_pool_; }
+
+ protected:
+  friend ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler();
+
+  TaskScheduler();
+
+  Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+                   StopCallback&&) override;
+
+  static std::shared_ptr<TaskScheduler> MakeCpuTaskScheduler();
+
+ private:
+  std::shared_ptr<ThreadPool> thread_pool_;
+  // std::queue<std::unique_ptr<Task>> task_queue_;
+  // std::atomic<int> active_tasks_counter_;
+  //
+};

Review comment:
       I'm not really sure what the purpose is here.  I thought "a scheduler 
has many executors" but these seems like "a scheduler is an executor".  I don't 
think the scheduler should need `MakeEternal` and there shouldn't be 
`MakeCpuTaskScheduler`.
   
   Maybe there is a `MakeDefaultScheduler` which has only a single executor 
(the CPU executor).  The scheduler shouldn't worry about `OwnsThisThread` or 
`GetCapacity`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to