westonpace commented on a change in pull request #11923:
URL: https://github.com/apache/arrow/pull/11923#discussion_r768442345
##########
File path: cpp/src/arrow/compute/exec/filter_node.cc
##########
@@ -92,10 +92,13 @@ class FilterNode : public MapNode {
return ExecBatch::Make(std::move(values));
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
DCHECK_EQ(input, inputs_[0]);
- auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); };
- this->SubmitTask(std::move(func), std::move(batch));
+ auto func = [this, task]() -> Result<ExecBatch> {
+ ARROW_ASSIGN_OR_RAISE(auto batch, task());
+ return DoFilter(std::move(batch));
+ };
+ this->SubmitTask(std::move(func));
Review comment:
```suggestion
outputs_[0]->InputReceived(this, std::move(func));
```
Eventually I'm thinking we will end up with something like this. The only
calls to `SubmitTask` would be from pipeline breakers.
##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -235,17 +238,18 @@ class ScalarAggregateNode : public ExecNode {
private:
Status Finish() {
- ExecBatch batch{{}, 1};
- batch.values.resize(kernels_.size());
-
- for (size_t i = 0; i < kernels_.size(); ++i) {
- KernelContext ctx{plan()->exec_context()};
- ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
- kernels_[i], &ctx,
std::move(states_[i])));
- RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
- }
-
- outputs_[0]->InputReceived(this, std::move(batch));
+ auto task = [this]() -> Result<ExecBatch> {
+ ExecBatch batch{{}, 1};
+ batch.values.resize(kernels_.size());
+ for (size_t i = 0; i < kernels_.size(); ++i) {
+ KernelContext ctx{plan()->exec_context()};
+ ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
+ kernels_[i], &ctx,
std::move(states_[i])));
+ RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
+ }
+ return batch;
+ };
+ outputs_[0]->InputReceived(this, std::move(task));
Review comment:
This is good :+1:
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -107,19 +109,19 @@ struct SourceNode : ExecNode {
ExecBatch batch = std::move(*maybe_batch);
if (executor) {
- auto status =
- task_group_.AddTask([this, executor, batch]() ->
Result<Future<>> {
- return executor->Submit([=]() {
- outputs_[0]->InputReceived(this, std::move(batch));
- return Status::OK();
- });
- });
+ auto status = task_group_.AddTask([this, executor,
+ batch]() ->
Result<Future<>> {
+ return executor->Submit([=]() {
+ outputs_[0]->InputReceived(this,
IdentityTask(std::move(batch)));
+ return Status::OK();
+ });
+ });
if (!status.ok()) {
outputs_[0]->ErrorReceived(this, std::move(status));
return Break(total_batches);
}
} else {
- outputs_[0]->InputReceived(this, std::move(batch));
+ outputs_[0]->InputReceived(this,
IdentityTask(std::move(batch)));
Review comment:
This would actually **not** create a task but forward to downstream like
filter/project.
##########
File path: cpp/src/arrow/compute/exec/union_node.cc
##########
@@ -74,13 +74,18 @@ class UnionNode : public ExecNode {
return plan->EmplaceNode<UnionNode>(plan, std::move(inputs));
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) !=
inputs_.end());
if (finished_.is_finished()) {
return;
}
- outputs_[0]->InputReceived(this, std::move(batch));
+ auto batch = task();
+ if (!batch.ok()) {
+ ErrorIfNotOk(batch.status());
+ return;
+ }
+ outputs_[0]->InputReceived(this, IdentityTask(batch.MoveValueUnsafe()));
Review comment:
This looks good :+1:
##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -175,18 +175,21 @@ class ScalarAggregateNode : public ExecNode {
return Status::OK();
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
DCHECK_EQ(input, inputs_[0]);
auto thread_index = get_thread_index_();
-
- if (ErrorIfNotOk(DoConsume(std::move(batch), thread_index))) return;
+ auto prev = task();
+ if (!prev.ok()) {
+ ErrorIfNotOk(prev.status());
+ return;
+ }
+ if (ErrorIfNotOk(DoConsume(prev.MoveValueUnsafe(), thread_index))) return;
Review comment:
```suggestion
auto func = [this] (Result<ExecBatch> task) {
ARROW_ASSIGN_OR_RAISE(auto prev, task());
auto thread_index = get_thread_index_();
return DoConsume(prev.MoveValueUnsafe(), thread_index);
};
plan_->scheduler()->SubmitTask(std::move(func));
```
This is what I'm thinking pipeline breakers would look like.
##########
File path: cpp/src/arrow/compute/exec/project_node.cc
##########
@@ -88,10 +88,13 @@ class ProjectNode : public MapNode {
return ExecBatch{std::move(values), target.length};
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
DCHECK_EQ(input, inputs_[0]);
- auto func = [this](ExecBatch batch) { return DoProject(std::move(batch));
};
- this->SubmitTask(std::move(func), std::move(batch));
+ auto func = [this, task]() -> Result<ExecBatch> {
+ ARROW_ASSIGN_OR_RAISE(auto batch, task());
+ return DoProject(std::move(batch));
+ };
+ this->SubmitTask(std::move(func));
Review comment:
```suggestion
this->SubmitTask(std::move(func));
```
this->outputs_[0]->InputReceived(this, std::move(func));
##########
File path: cpp/src/arrow/compute/exec/hash_join_node.cc
##########
@@ -494,7 +494,13 @@ class HashJoinNode : public ExecNode {
size_t thread_index = thread_indexer_();
int side = (input == inputs_[0]) ? 0 : 1;
{
- Status status = impl_->InputReceived(thread_index, side,
std::move(batch));
+ auto batch = task();
+ if (!batch.ok()) {
+ StopProducing();
+ ErrorIfNotOk(batch.status());
+ return;
+ }
+ Status status = impl_->InputReceived(thread_index, side,
batch.MoveValueUnsafe());
Review comment:
This would change to submit task.
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -95,10 +95,15 @@ class SinkNode : public ExecNode {
Future<> finished() override { return finished_; }
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
DCHECK_EQ(input, inputs_[0]);
- bool did_push = producer_.Push(std::move(batch));
+ auto batch = task();
+ if (!batch.ok()) {
+ ErrorIfNotOk(batch.status());
+ return;
+ }
+ bool did_push = producer_.Push(batch.MoveValueUnsafe());
Review comment:
This changes to submit task.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -350,37 +350,38 @@ void MapNode::StopProducing() {
Future<> MapNode::finished() { return finished_; }
-void MapNode::SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
- ExecBatch batch) {
+void MapNode::SubmitTask(std::function<Result<ExecBatch>()> map_fn) {
Review comment:
I don't know if `MapNode` will still need to exist. Eventually this
logic will be moving over to the scheduler I think.
##########
File path: cpp/src/arrow/compute/exec/aggregate_node.cc
##########
@@ -483,13 +491,18 @@ class GroupByNode : public ExecNode {
return Status::OK();
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
// bail if StopProducing was called
if (finished_.is_finished()) return;
DCHECK_EQ(input, inputs_[0]);
- if (ErrorIfNotOk(Consume(std::move(batch)))) return;
+ auto prev = task();
+ if (!prev.ok()) {
+ ErrorIfNotOk(prev.status());
+ return;
+ }
+ if (ErrorIfNotOk(Consume(prev.MoveValueUnsafe()))) return;
Review comment:
```suggestion
auto prev = task();
if (!prev.ok()) {
ErrorIfNotOk(prev.status());
return;
}
if (ErrorIfNotOk(Consume(prev.MoveValueUnsafe()))) return;
```
This should change to submit task
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -188,7 +193,12 @@ class ConsumingSinkNode : public ExecNode {
return;
}
- Status consumption_status = consumer_->Consume(std::move(batch));
+ auto batch = task();
+ if (!batch.ok()) {
+ ErrorIfNotOk(batch.status());
+ return;
+ }
+ Status consumption_status = consumer_->Consume(batch.MoveValueUnsafe());
Review comment:
This changes to submit task.
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -274,11 +284,15 @@ struct OrderBySinkNode final : public SinkNode {
sink_options.backpressure);
}
- void InputReceived(ExecNode* input, ExecBatch batch) override {
+ void InputReceived(ExecNode* input, std::function<Result<ExecBatch>()> task)
override {
DCHECK_EQ(input, inputs_[0]);
-
- auto maybe_batch = batch.ToRecordBatch(inputs_[0]->output_schema(),
-
plan()->exec_context()->memory_pool());
+ auto batch = task();
+ if (!batch.ok()) {
+ ErrorIfNotOk(batch.status());
+ return;
+ }
+ auto maybe_batch = batch.ValueUnsafe().ToRecordBatch(
+ inputs_[0]->output_schema(), plan()->exec_context()->memory_pool());
Review comment:
This would build up a task and pass to impl_->InputReceived. Or maybe
the task itself would call impl_->InputReceived and this method calls submit
task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]