aocsa commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r717642037
##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -98,12 +97,25 @@ class FilterNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
-
- auto maybe_filtered = DoFilter(std::move(batch));
- if (ErrorIfNotOk(maybe_filtered.status())) return;
-
- maybe_filtered->guarantee = batch.guarantee;
- outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+ if (finished_.is_finished()) {
+ return;
+ }
Review comment:
Sure
##########
File path: cpp/src/arrow/util/task_scheduler.h
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <type_traits>
+#include <utility>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/cancel.h"
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/visibility.h"
+
+#if defined(_MSC_VER)
+// Disable harmless warning for decorated name length limit
+#pragma warning(disable : 4503)
+#endif
+
+namespace arrow {
+namespace internal {
+
+class ARROW_EXPORT TaskScheduler : public Executor {
+ public:
+ static Result<std::shared_ptr<TaskScheduler>> Make(int threads);
+ static Result<std::shared_ptr<TaskScheduler>> MakeEternal(int threads);
+
+ ~TaskScheduler() override;
+
+ int GetCapacity() override;
+
+ bool OwnsThisThread() override;
+
+ std::shared_ptr<ThreadPool> pool() { return thread_pool_; }
+
+ protected:
+ friend ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler();
+
+ TaskScheduler();
+
+ Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+ StopCallback&&) override;
+
+ static std::shared_ptr<TaskScheduler> MakeCpuTaskScheduler();
+
+ private:
+ std::shared_ptr<ThreadPool> thread_pool_;
+ // std::queue<std::unique_ptr<Task>> task_queue_;
+ // std::atomic<int> active_tasks_counter_;
+ //
+};
Review comment:
Agree. TaskScheduler should have multiple executors. I think it can be
part of follow-up PR after discussing the next steps.
##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -99,11 +105,35 @@ class FilterNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
- auto maybe_filtered = DoFilter(std::move(batch));
- if (ErrorIfNotOk(maybe_filtered.status())) return;
+ ARROW_LOG(DEBUG) << "FilterNode: >> input";
+
+ auto executor = plan()->exec_context()->executor();
+ if (executor) {
+ auto maybe_future = executor->Submit([this, batch] {
Review comment:
done
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
// tasks are finished.
Status Shutdown(bool wait = true);
+ Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
+ StopCallback&&) override;
Review comment:
do, we don't
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -283,6 +290,35 @@ bool ExecNode::ErrorIfNotOk(Status status) {
return true;
}
+Status ExecNode::SubmitTask(std::function<Status()> task) {
+ if (finished_.is_finished()) {
+ return Status::OK();
+ }
+ if (this->has_executor()) {
+ DCHECK(task_group_ != nullptr);
+ task_group_->Append(std::move(task));
+ } else {
+ std::move(task)();
+ }
+ if (batch_count_.Increment()) {
+ this->MarkFinished();
+ }
+ return Status::OK();
+}
+
+void ExecNode::MarkFinished(bool request_stop) {
+ if (this->has_executor()) {
+ if (request_stop) {
+ this->stop_source_.RequestStop();
+ }
+ task_group_->FinishAsync().AddCallback([this](const Status& status) {
+ if (!this->finished_.is_finished()) this->finished_.MarkFinished(status);
Review comment:
Based on my test and profiling test, yes this is necessary. The first
call to this functions could when `batch_count_` reach the total_batches, but
at the same time ExecNode::StopProducing signal could happens. So only once
it's neccesary to mark as Finished `finished_`.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -221,6 +228,18 @@ class ARROW_EXPORT ExecNode {
std::string ToString() const;
+ /// \brief Is an executor available?
+ bool has_executor() { return plan()->exec_context()->executor() != nullptr; }
Review comment:
For sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]