lidavidm commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r721345038
##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -61,9 +62,16 @@ class FilterNode : public ExecNode {
filter_expression.ToString(), " evaluates to ",
filter_expression.type()->ToString());
}
-
+ std::unique_ptr<ExecNodeRunnerImpl> impl;
+ if (plan->exec_context()->executor() == nullptr) {
Review comment:
This check could perhaps just be folded into a `MakeSimpleRunner`.
##########
File path: cpp/src/arrow/compute/exec/exec_node_runner_impl.h
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/exec/util.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace compute {
+
+class ExecNodeRunnerImpl {
Review comment:
Looking at the implementations, I wonder if this shouldn't actually just
implement ExecNode? The reason being that most of the boilerplate we want to
get rid of is still there right now, and additionally, there's more opportunity
for implementer mistakes since you have to remember to call these new methods
on the impl, and since the impl class hides the implementation, it's hard to
tell at a glance if things are correct anymore. A subclass approach may not
compose as well, but I'm not sure there's composition to be had in the first
place here, and it would actually remove most of the boilerplate.
##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -98,12 +106,23 @@ class FilterNode : public ExecNode {
void InputReceived(ExecNode* input, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);
-
- auto maybe_filtered = DoFilter(std::move(batch));
- if (ErrorIfNotOk(maybe_filtered.status())) return;
-
- maybe_filtered->guarantee = batch.guarantee;
- outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+ auto task = [this, batch]() {
+ auto maybe_filtered = DoFilter(std::move(batch));
+ if (ErrorIfNotOk(maybe_filtered.status())) {
+ return maybe_filtered.status();
+ }
+ maybe_filtered->guarantee = batch.guarantee;
+ outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe());
+ return Status::OK();
+ };
+ auto status = impl_->SubmitTask(task);
+ if (!status.ok()) {
Review comment:
Why not fold most of this logic into the impl itself?
```cpp
if (!impl_->SubmitTask(task)) {
StopProducing();
}
```
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -840,6 +840,18 @@ inline Future<>::Future(Status s) :
Future(internal::Empty::ToResult(std::move(s
ARROW_EXPORT
Future<> AllComplete(const std::vector<Future<>>& futures);
+
+/// \brief Create a Future which completes when all of `futures` complete.
+///
+/// The future will be marked complete if all `futures` complete
+/// successfully. Otherwise, it will be marked failed with the status of
+/// the first failing future.
+///
+/// Unlike AllComplete this Future will not complete immediately when a
+/// failure occurs. It will wait until all futures have finished.
+ARROW_EXPORT
+Future<> AllCompleteOrFailed(const std::vector<Future<>>& futures);
Review comment:
nit: maybe `AllFinished`?
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -840,6 +840,18 @@ inline Future<>::Future(Status s) :
Future(internal::Empty::ToResult(std::move(s
ARROW_EXPORT
Future<> AllComplete(const std::vector<Future<>>& futures);
+
+/// \brief Create a Future which completes when all of `futures` complete.
+///
+/// The future will be marked complete if all `futures` complete
+/// successfully. Otherwise, it will be marked failed with the status of
+/// the first failing future.
Review comment:
nit: to be clear, this could be "…status of a failing future" (since
"first failing future" above implies the first one to fail, not the first one
in the list).
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -175,9 +175,9 @@ struct OrderBySinkNode final : public SinkNode {
plan()->exec_context()->memory_pool());
if (ErrorIfNotOk(maybe_batch.status())) {
StopProducing();
- bool cancelled = input_counter_.Cancel();
- DCHECK(cancelled);
- finished_.MarkFinished(maybe_batch.status());
+ if (input_counter_.Cancel()) {
Review comment:
Hmm, is it possible for input_counter_.Cancel() to be false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]