bkietz commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r640907832



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return 
ToDerived(this)->Validate(); }
 
 Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
 
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
-                   std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
                    std::vector<std::string> input_labels, BatchDescr 
output_descr,
                    int num_outputs)
     : plan_(plan),
       label_(std::move(label)),
-      input_descrs_(std::move(input_descrs)),
+      inputs_(std::move(inputs)),
       input_labels_(std::move(input_labels)),
       output_descr_(std::move(output_descr)),
-      num_outputs_(num_outputs) {}
+      num_outputs_(num_outputs) {
+  for (auto input : inputs_) {
+    input->outputs_.push_back(this);
+  }
+}
 
 Status ExecNode::Validate() const {
-  if (inputs_.size() != input_descrs_.size()) {
+  if (inputs_.size() != input_labels_.size()) {
     return Status::Invalid("Invalid number of inputs for '", label(), "' 
(expected ",
-                           num_inputs(), ", actual ", inputs_.size(), ")");
+                           num_inputs(), ", actual ", input_labels_.size(), 
")");
   }
 
   if (static_cast<int>(outputs_.size()) != num_outputs_) {
     return Status::Invalid("Invalid number of outputs for '", label(), "' 
(expected ",
                            num_outputs(), ", actual ", outputs_.size(), ")");
   }
 
-  DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
   for (auto out : outputs_) {
     auto input_index = GetNodeIndex(out->inputs(), this);
     if (!input_index) {
       return Status::Invalid("Node '", label(), "' outputs to node '", 
out->label(),
                              "' but is not listed as an input.");
     }
+  }
 
-    const auto& in_descr = out->input_descrs_[*input_index];
-    if (in_descr != output_descr_) {
-      return Status::Invalid(
-          "Node '", label(), "' (bound to input ", input_labels_[*input_index],
-          ") produces batches with type '", 
ValueDescr::ToString(output_descr_),
-          "' inconsistent with consumer '", out->label(), "' which accepts '",
-          ValueDescr::ToString(in_descr), "'");
+  return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+  SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr 
output_descr,
+             AsyncGenerator<util::optional<ExecBatch>> generator)
+      : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+                 /*num_outputs=*/1),
+        generator_(std::move(generator)) {}
+
+  const char* kind_name() override { return "SourceNode"; }
+
+  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
+  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+  Status StartProducing() override {
+    if (finished_) {
+      return Status::Invalid("Restarted SourceNode '", label(), "'");
     }
+
+    auto gen = std::move(generator_);
+
+    /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+    auto done_fut =
+        Loop([gen, this] {
+          std::unique_lock<std::mutex> lock(mutex_);
+          int seq = next_batch_index_++;
+          if (finished_) {
+            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+          }
+          lock.unlock();
+
+          return gen().Then(
+              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+                std::unique_lock<std::mutex> lock(mutex_);
+                if (!batch || finished_) {
+                  finished_ = true;
+                  return Break(seq);
+                }
+                lock.unlock();
+
+                outputs_[0]->InputReceived(this, seq, *batch);
+                return Continue();
+              },
+              [=](const Status& error) -> ControlFlow<int> {
+                std::unique_lock<std::mutex> lock(mutex_);
+                if (!finished_) {
+                  finished_ = true;
+                  lock.unlock();
+                  // unless we were already finished, push the error to our 
output
+                  // XXX is this correct? Is it reasonable for a consumer to 
ignore errors
+                  // from a finished producer?

Review comment:
       I'd agree that handling/ignoring trailing batches is necessary; the 
producer may take a while to stop. However I wonder if it's reasonable to do 
the same for trailing errors. For example: let's say we have a plan where a 
LimitNode is taking the first 99 rows from EmitsErrorAfterHundredthRowNode. 
There's a race condition here (also depends on chunking) because the LimitNode 
will sometimes receive the trailing error before it can stop the producer and 
sometimes  will succeed in stopping its producer before it gets around to 
raising an error. I'm not sure what the correct answer here is, but I lean 
toward: if any node emits any error, that always puts all subsequent nodes into 
an error state too (unless explicitly intercepted). The above example seems 
like a problem we need to fix in EmitsErrorAfterHundredthRowNode rather than 
requiring all consumers to ignore post-stop errors




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to