westonpace commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r724398136



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -243,6 +248,128 @@ class ARROW_EXPORT ExecNode {
   NodeVector outputs_;
 };
 
+/// \brief MapNode is an ExecNode type class which process a task like 
filter/project
+/// (See SubmitTask method) to each given ExecBatch object, which have one 
input, one
+/// output, and are pure functions on the input
+///
+/// A simple parallel runner is created with a "map_fn" which is just a 
function that
+/// takes a batch in and returns a batch.  This simple parallel runner also 
needs an
+/// executor (use simple synchronous runner if there is no executor)
+
+class MapNode : public ExecNode {
+ public:
+  MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+          std::shared_ptr<Schema> output_schema)
+      : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
+                 std::move(output_schema),
+                 /*num_outputs=*/1) {
+    executor_ = plan_->exec_context()->executor();
+  }
+
+  void ErrorReceived(ExecNode* input, Status error) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->ErrorReceived(this, std::move(error));
+  }
+
+  void InputFinished(ExecNode* input, int total_batches) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->InputFinished(this, total_batches);
+    if (input_counter_.SetTotal(total_batches)) {
+      this->Finish();
+    }
+  }
+
+  Status StartProducing() override { return Status::OK(); }
+
+  void PauseProducing(ExecNode* output) override {}
+
+  void ResumeProducing(ExecNode* output) override {}
+
+  void StopProducing(ExecNode* output) override {
+    DCHECK_EQ(output, outputs_[0]);
+    StopProducing();
+  }
+
+  void StopProducing() override {
+    if (executor_) {
+      this->stop_source_.RequestStop();
+    }
+    if (input_counter_.Cancel()) {
+      this->Finish();
+    }
+    inputs_[0]->StopProducing(this);
+  }
+
+  Future<> finished() override { return finished_; }
+
+ protected:
+  void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, 
ExecBatch batch) {
+    Status status;
+    if (finished_.is_finished()) {
+      return;
+    }

Review comment:
       In theory this shouldn't be possible.  Maybe 
`DCHECK(!finished_.is_finished())`?  You would have to move this into the `.cc` 
file though since `DCHECK` isn't allowed in public headers.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -243,6 +248,128 @@ class ARROW_EXPORT ExecNode {
   NodeVector outputs_;
 };
 
+/// \brief MapNode is an ExecNode type class which process a task like 
filter/project
+/// (See SubmitTask method) to each given ExecBatch object, which have one 
input, one
+/// output, and are pure functions on the input
+///
+/// A simple parallel runner is created with a "map_fn" which is just a 
function that
+/// takes a batch in and returns a batch.  This simple parallel runner also 
needs an
+/// executor (use simple synchronous runner if there is no executor)
+
+class MapNode : public ExecNode {
+ public:
+  MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+          std::shared_ptr<Schema> output_schema)
+      : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
+                 std::move(output_schema),
+                 /*num_outputs=*/1) {
+    executor_ = plan_->exec_context()->executor();
+  }
+
+  void ErrorReceived(ExecNode* input, Status error) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->ErrorReceived(this, std::move(error));
+  }
+
+  void InputFinished(ExecNode* input, int total_batches) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->InputFinished(this, total_batches);
+    if (input_counter_.SetTotal(total_batches)) {
+      this->Finish();
+    }
+  }
+
+  Status StartProducing() override { return Status::OK(); }
+
+  void PauseProducing(ExecNode* output) override {}
+
+  void ResumeProducing(ExecNode* output) override {}
+
+  void StopProducing(ExecNode* output) override {
+    DCHECK_EQ(output, outputs_[0]);
+    StopProducing();
+  }
+
+  void StopProducing() override {
+    if (executor_) {
+      this->stop_source_.RequestStop();
+    }
+    if (input_counter_.Cancel()) {
+      this->Finish();
+    }
+    inputs_[0]->StopProducing(this);
+  }
+
+  Future<> finished() override { return finished_; }
+
+ protected:
+  void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, 
ExecBatch batch) {
+    Status status;
+    if (finished_.is_finished()) {
+      return;
+    }
+    auto task = [this, map_fn, batch]() {
+      auto output_batch = map_fn(std::move(batch));
+      if (ErrorIfNotOk(output_batch.status())) {
+        return output_batch.status();
+      }
+      output_batch->guarantee = batch.guarantee;
+      outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe());
+      return Status::OK();
+    };
+    if (executor_) {
+      status = task_group_.AddTask([this, task]() -> Result<Future<>> {
+        return this->executor_->Submit(this->stop_source_.token(), [this, 
task]() {
+          auto status = task();
+          if (this->input_counter_.Increment()) {
+            this->Finish(status);
+          }
+          return status;
+        });
+      });
+    } else {
+      status = task();
+      if (input_counter_.Increment()) {
+        this->Finish(status);
+      }
+    }
+    if (!status.ok()) {
+      if (input_counter_.Cancel()) {
+        this->Finish(status);
+      }
+      inputs_[0]->StopProducing(this);
+      return;
+    }
+  }
+
+  void Finish(Status finish_st = Status::OK()) {
+    if (executor_) {
+      task_group_.End().AddCallback([this, finish_st](const Status& st) {
+        Status final_status = finish_st & st;
+        this->finished_.MarkFinished(final_status);
+      });
+    } else {
+      this->finished_.MarkFinished(finish_st);
+    }
+  }
+
+ protected:
+  // Counter for the number of batches received
+  AtomicCounter input_counter_;
+
+  // Future to sync finished
+  Future<> finished_ = Future<>::Make();
+
+  // The task group for the corresponding batches
+  util::AsyncTaskGroup task_group_;
+
+  // Executor

Review comment:
       Nit: This comment isn't providing much.
   ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -243,6 +248,128 @@ class ARROW_EXPORT ExecNode {
   NodeVector outputs_;
 };
 
+/// \brief MapNode is an ExecNode type class which process a task like 
filter/project
+/// (See SubmitTask method) to each given ExecBatch object, which have one 
input, one
+/// output, and are pure functions on the input
+///
+/// A simple parallel runner is created with a "map_fn" which is just a 
function that
+/// takes a batch in and returns a batch.  This simple parallel runner also 
needs an
+/// executor (use simple synchronous runner if there is no executor)
+
+class MapNode : public ExecNode {
+ public:
+  MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+          std::shared_ptr<Schema> output_schema)
+      : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
+                 std::move(output_schema),
+                 /*num_outputs=*/1) {
+    executor_ = plan_->exec_context()->executor();
+  }
+
+  void ErrorReceived(ExecNode* input, Status error) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->ErrorReceived(this, std::move(error));
+  }
+
+  void InputFinished(ExecNode* input, int total_batches) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->InputFinished(this, total_batches);
+    if (input_counter_.SetTotal(total_batches)) {
+      this->Finish();
+    }
+  }
+
+  Status StartProducing() override { return Status::OK(); }
+
+  void PauseProducing(ExecNode* output) override {}
+
+  void ResumeProducing(ExecNode* output) override {}
+
+  void StopProducing(ExecNode* output) override {
+    DCHECK_EQ(output, outputs_[0]);
+    StopProducing();
+  }
+
+  void StopProducing() override {
+    if (executor_) {
+      this->stop_source_.RequestStop();
+    }
+    if (input_counter_.Cancel()) {
+      this->Finish();
+    }
+    inputs_[0]->StopProducing(this);
+  }
+
+  Future<> finished() override { return finished_; }
+
+ protected:
+  void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, 
ExecBatch batch) {
+    Status status;
+    if (finished_.is_finished()) {
+      return;
+    }
+    auto task = [this, map_fn, batch]() {
+      auto output_batch = map_fn(std::move(batch));
+      if (ErrorIfNotOk(output_batch.status())) {
+        return output_batch.status();
+      }
+      output_batch->guarantee = batch.guarantee;

Review comment:
       I'm not sure this is universally applicable but it is ok for now.  For 
example, a filter node should be modifying the guarantee to `batch.guarantee && 
filter expression` but I don't think we do that now.  This behavior would be 
correct for a project node though.  I think it would be more correct for the 
subclasses to own this responsibility.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -243,6 +248,128 @@ class ARROW_EXPORT ExecNode {
   NodeVector outputs_;
 };
 
+/// \brief MapNode is an ExecNode type class which process a task like 
filter/project
+/// (See SubmitTask method) to each given ExecBatch object, which have one 
input, one
+/// output, and are pure functions on the input
+///
+/// A simple parallel runner is created with a "map_fn" which is just a 
function that
+/// takes a batch in and returns a batch.  This simple parallel runner also 
needs an
+/// executor (use simple synchronous runner if there is no executor)
+
+class MapNode : public ExecNode {
+ public:
+  MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+          std::shared_ptr<Schema> output_schema)
+      : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
+                 std::move(output_schema),
+                 /*num_outputs=*/1) {
+    executor_ = plan_->exec_context()->executor();
+  }
+
+  void ErrorReceived(ExecNode* input, Status error) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->ErrorReceived(this, std::move(error));
+  }
+
+  void InputFinished(ExecNode* input, int total_batches) override {
+    DCHECK_EQ(input, inputs_[0]);
+    outputs_[0]->InputFinished(this, total_batches);
+    if (input_counter_.SetTotal(total_batches)) {
+      this->Finish();
+    }
+  }
+
+  Status StartProducing() override { return Status::OK(); }
+
+  void PauseProducing(ExecNode* output) override {}
+
+  void ResumeProducing(ExecNode* output) override {}
+
+  void StopProducing(ExecNode* output) override {
+    DCHECK_EQ(output, outputs_[0]);
+    StopProducing();
+  }
+
+  void StopProducing() override {
+    if (executor_) {
+      this->stop_source_.RequestStop();
+    }
+    if (input_counter_.Cancel()) {
+      this->Finish();
+    }
+    inputs_[0]->StopProducing(this);
+  }
+
+  Future<> finished() override { return finished_; }
+
+ protected:
+  void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, 
ExecBatch batch) {
+    Status status;
+    if (finished_.is_finished()) {
+      return;
+    }
+    auto task = [this, map_fn, batch]() {
+      auto output_batch = map_fn(std::move(batch));
+      if (ErrorIfNotOk(output_batch.status())) {
+        return output_batch.status();
+      }
+      output_batch->guarantee = batch.guarantee;
+      outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe());
+      return Status::OK();
+    };
+    if (executor_) {
+      status = task_group_.AddTask([this, task]() -> Result<Future<>> {
+        return this->executor_->Submit(this->stop_source_.token(), [this, 
task]() {
+          auto status = task();
+          if (this->input_counter_.Increment()) {
+            this->Finish(status);
+          }
+          return status;
+        });
+      });
+    } else {
+      status = task();
+      if (input_counter_.Increment()) {
+        this->Finish(status);
+      }
+    }
+    if (!status.ok()) {
+      if (input_counter_.Cancel()) {
+        this->Finish(status);
+      }
+      inputs_[0]->StopProducing(this);

Review comment:
       Should this call `ErrorReceived` on the outputs instead?  Or could we 
just call `ErrorIfNotOk` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to