lidavidm commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r723384862
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -258,11 +266,57 @@ class MapNode : public ExecNode {
executor_ = plan_->exec_context()->executor();
}
- void SubmitTask(std::function<Status()> task) {
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int total_batches) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->InputFinished(this, total_batches);
+ if (input_counter_.SetTotal(total_batches)) {
+ this->Finish();
+ }
+ }
+
+ Status StartProducing() override { return Status::OK(); }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ if (executor_) {
+ this->stop_source_.RequestStop();
+ }
+ if (input_counter_.Cancel()) {
+ this->Finish();
+ }
+ inputs_[0]->StopProducing(this);
+ }
+
+ Future<> finished() override { return finished_; }
+
+ protected:
+ void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
ExecBatch batch) {
+ Status status;
if (finished_.is_finished()) {
return;
}
- Status status;
+ auto task = [this, map_fn, batch]() {
Review comment:
This copies batch/map_fn into the closure, which is a little
unfortunate. We don't have capture-by-move, unfortunately, but you could
manually write out a callable struct to avoid the copy (though it's probably
not a big deal).
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -258,11 +266,57 @@ class MapNode : public ExecNode {
executor_ = plan_->exec_context()->executor();
}
- void SubmitTask(std::function<Status()> task) {
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int total_batches) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->InputFinished(this, total_batches);
+ if (input_counter_.SetTotal(total_batches)) {
+ this->Finish();
+ }
+ }
+
+ Status StartProducing() override { return Status::OK(); }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ if (executor_) {
+ this->stop_source_.RequestStop();
+ }
+ if (input_counter_.Cancel()) {
+ this->Finish();
+ }
+ inputs_[0]->StopProducing(this);
+ }
+
+ Future<> finished() override { return finished_; }
+
+ protected:
+ void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
ExecBatch batch) {
+ Status status;
if (finished_.is_finished()) {
return;
}
- Status status;
+ auto task = [this, map_fn, batch]() {
Review comment:
(And even then, the task gets copied into the other closures below, so
you'd have to unroll those in turn…maybe not worth it.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]