save-buffer commented on code in PR #13848:
URL: https://github.com/apache/arrow/pull/13848#discussion_r944979523


##########
cpp/src/arrow/compute/exec/project_node.cc:
##########
@@ -91,26 +95,34 @@ class ProjectNode : public MapNode {
       ARROW_ASSIGN_OR_RAISE(values[i], 
ExecuteScalarExpression(simplified_expr, target,
                                                                
plan()->exec_context()));
     }
+    END_SPAN(span);
+
     return ExecBatch{std::move(values), target.length};
   }
 
-  void InputReceived(ExecNode* input, ExecBatch batch) override {
-    EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
+  Status StartProducing() override { return Status::OK(); }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->PauseProducing(this, counter);
+  }
+
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->ResumeProducing(this, counter);
+  }
+
+  Status InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
-    auto func = [this](ExecBatch batch) {
-      util::tracing::Span span;
-      START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
-                                     {{"project", ToStringExtra()},
-                                      {"node.label", label()},
-                                      {"batch.length", batch.length}});
-      auto result = DoProject(std::move(batch));
-      MARK_SPAN(span, result.status());
-      END_SPAN(span);
-      return result;
-    };
-    this->SubmitTask(std::move(func), std::move(batch));
+    ARROW_ASSIGN_OR_RAISE(ExecBatch projected, DoProject(std::move(batch)));
+    return output_->InputReceived(this, std::move(projected));
   }
 
+  Status InputFinished(ExecNode* input, int num_batches) override {
+    END_SPAN(span_);
+    return output_->InputFinished(this, num_batches);
+  }

Review Comment:
   Yeah it probably can be. Actually this span thing is a bit broken right now 
in general because we don't enforce that InputFinished is called _after_ all 
batches have been output. InputFinished is merely to specify the total number 
of batches that will be output, so e.g. in the case of scalar aggregates that 
output only one row ever, `InputFinished` is called in `StopProducing`, and so 
a project above a scalar aggregate node would be ended immediately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to