bkietz commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r644836145



##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -204,5 +204,35 @@ arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
   return internal::checked_pointer_cast<T>(source);
 }
 
+class FragmentDataset : public Dataset {

Review comment:
       Added https://issues.apache.org/jira/browse/ARROW-12945 to track this 
improvement.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return 
ToDerived(this)->Validate(); }
 
 Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
 
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
-                   std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
                    std::vector<std::string> input_labels, BatchDescr 
output_descr,
                    int num_outputs)
     : plan_(plan),
       label_(std::move(label)),
-      input_descrs_(std::move(input_descrs)),
+      inputs_(std::move(inputs)),
       input_labels_(std::move(input_labels)),
       output_descr_(std::move(output_descr)),
-      num_outputs_(num_outputs) {}
+      num_outputs_(num_outputs) {
+  for (auto input : inputs_) {
+    input->outputs_.push_back(this);
+  }
+}
 
 Status ExecNode::Validate() const {
-  if (inputs_.size() != input_descrs_.size()) {
+  if (inputs_.size() != input_labels_.size()) {
     return Status::Invalid("Invalid number of inputs for '", label(), "' 
(expected ",
-                           num_inputs(), ", actual ", inputs_.size(), ")");
+                           num_inputs(), ", actual ", input_labels_.size(), 
")");
   }
 
   if (static_cast<int>(outputs_.size()) != num_outputs_) {
     return Status::Invalid("Invalid number of outputs for '", label(), "' 
(expected ",
                            num_outputs(), ", actual ", outputs_.size(), ")");
   }
 
-  DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
   for (auto out : outputs_) {
     auto input_index = GetNodeIndex(out->inputs(), this);
     if (!input_index) {
       return Status::Invalid("Node '", label(), "' outputs to node '", 
out->label(),
                              "' but is not listed as an input.");
     }
+  }
 
-    const auto& in_descr = out->input_descrs_[*input_index];
-    if (in_descr != output_descr_) {
-      return Status::Invalid(
-          "Node '", label(), "' (bound to input ", input_labels_[*input_index],
-          ") produces batches with type '", 
ValueDescr::ToString(output_descr_),
-          "' inconsistent with consumer '", out->label(), "' which accepts '",
-          ValueDescr::ToString(in_descr), "'");
+  return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+  SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr 
output_descr,
+             AsyncGenerator<util::optional<ExecBatch>> generator)
+      : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+                 /*num_outputs=*/1),
+        generator_(std::move(generator)) {}
+
+  const char* kind_name() override { return "SourceNode"; }
+
+  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
+  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+  Status StartProducing() override {
+    if (finished_) {
+      return Status::Invalid("Restarted SourceNode '", label(), "'");
     }
+
+    auto gen = std::move(generator_);
+
+    /// XXX should we wait on this future anywhere? In StopProducing() maybe?

Review comment:
       Perhaps in `StopProducing()`? Then we append to the ExecNode contract 
that StopProducing must be called on a producing node before it is destroyed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to