aocsa commented on a change in pull request #11210:
URL: https://github.com/apache/arrow/pull/11210#discussion_r724689820
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -243,6 +248,128 @@ class ARROW_EXPORT ExecNode {
NodeVector outputs_;
};
+/// \brief MapNode is an ExecNode type class which process a task like
filter/project
+/// (See SubmitTask method) to each given ExecBatch object, which have one
input, one
+/// output, and are pure functions on the input
+///
+/// A simple parallel runner is created with a "map_fn" which is just a
function that
+/// takes a batch in and returns a batch. This simple parallel runner also
needs an
+/// executor (use simple synchronous runner if there is no executor)
+
+class MapNode : public ExecNode {
+ public:
+ MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema)
+ : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"},
+ std::move(output_schema),
+ /*num_outputs=*/1) {
+ executor_ = plan_->exec_context()->executor();
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ }
+
+ void InputFinished(ExecNode* input, int total_batches) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->InputFinished(this, total_batches);
+ if (input_counter_.SetTotal(total_batches)) {
+ this->Finish();
+ }
+ }
+
+ Status StartProducing() override { return Status::OK(); }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ StopProducing();
+ }
+
+ void StopProducing() override {
+ if (executor_) {
+ this->stop_source_.RequestStop();
+ }
+ if (input_counter_.Cancel()) {
+ this->Finish();
+ }
+ inputs_[0]->StopProducing(this);
+ }
+
+ Future<> finished() override { return finished_; }
+
+ protected:
+ void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
ExecBatch batch) {
+ Status status;
+ if (finished_.is_finished()) {
+ return;
+ }
+ auto task = [this, map_fn, batch]() {
+ auto output_batch = map_fn(std::move(batch));
+ if (ErrorIfNotOk(output_batch.status())) {
+ return output_batch.status();
+ }
+ output_batch->guarantee = batch.guarantee;
+ outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe());
+ return Status::OK();
+ };
+ if (executor_) {
+ status = task_group_.AddTask([this, task]() -> Result<Future<>> {
+ return this->executor_->Submit(this->stop_source_.token(), [this,
task]() {
+ auto status = task();
+ if (this->input_counter_.Increment()) {
+ this->Finish(status);
+ }
+ return status;
+ });
+ });
+ } else {
+ status = task();
+ if (input_counter_.Increment()) {
+ this->Finish(status);
+ }
+ }
+ if (!status.ok()) {
+ if (input_counter_.Cancel()) {
+ this->Finish(status);
+ }
+ inputs_[0]->StopProducing(this);
Review comment:
I think this part of the code is ok. This is following the pattern as
https://github.com/apache/arrow/blob/4b8ffe41518ec7778ba420fdc00772df217e9145/cpp/src/arrow/compute/exec/sink_node.cc#L189-L193
The ErrorIfNotOk is defined into the task:
https://github.com/apache/arrow/blob/85af59892b83fb49af58c2919d98853b9c1779fd/cpp/src/arrow/compute/exec/exec_plan.h#L313
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]