bkietz commented on a change in pull request #10397: URL: https://github.com/apache/arrow/pull/10397#discussion_r644836145
########## File path: cpp/src/arrow/dataset/dataset_internal.h ########## @@ -204,5 +204,35 @@ arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions( return internal::checked_pointer_cast<T>(source); } +class FragmentDataset : public Dataset { Review comment: Added https://issues.apache.org/jira/browse/ARROW-12945 to track this improvement. ########## File path: cpp/src/arrow/compute/exec/exec_plan.cc ########## @@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return ToDerived(this)->Validate(); } Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); } -ExecNode::ExecNode(ExecPlan* plan, std::string label, - std::vector<BatchDescr> input_descrs, +ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs, std::vector<std::string> input_labels, BatchDescr output_descr, int num_outputs) : plan_(plan), label_(std::move(label)), - input_descrs_(std::move(input_descrs)), + inputs_(std::move(inputs)), input_labels_(std::move(input_labels)), output_descr_(std::move(output_descr)), - num_outputs_(num_outputs) {} + num_outputs_(num_outputs) { + for (auto input : inputs_) { + input->outputs_.push_back(this); + } +} Status ExecNode::Validate() const { - if (inputs_.size() != input_descrs_.size()) { + if (inputs_.size() != input_labels_.size()) { return Status::Invalid("Invalid number of inputs for '", label(), "' (expected ", - num_inputs(), ", actual ", inputs_.size(), ")"); + num_inputs(), ", actual ", input_labels_.size(), ")"); } if (static_cast<int>(outputs_.size()) != num_outputs_) { return Status::Invalid("Invalid number of outputs for '", label(), "' (expected ", num_outputs(), ", actual ", outputs_.size(), ")"); } - DCHECK_EQ(input_descrs_.size(), input_labels_.size()); - for (auto out : outputs_) { auto input_index = GetNodeIndex(out->inputs(), this); if (!input_index) { return Status::Invalid("Node '", label(), "' outputs to node '", out->label(), "' but is not listed as an input."); } + } - const auto& in_descr = out->input_descrs_[*input_index]; - if (in_descr != output_descr_) { - return Status::Invalid( - "Node '", label(), "' (bound to input ", input_labels_[*input_index], - ") produces batches with type '", ValueDescr::ToString(output_descr_), - "' inconsistent with consumer '", out->label(), "' which accepts '", - ValueDescr::ToString(in_descr), "'"); + return Status::OK(); +} + +struct SourceNode : ExecNode { + SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr output_descr, + AsyncGenerator<util::optional<ExecBatch>> generator) + : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr), + /*num_outputs=*/1), + generator_(std::move(generator)) {} + + const char* kind_name() override { return "SourceNode"; } + + static void NoInputs() { DCHECK(false) << "no inputs; this should never be called"; } + void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); } + void ErrorReceived(ExecNode*, Status) override { NoInputs(); } + void InputFinished(ExecNode*, int) override { NoInputs(); } + + Status StartProducing() override { + if (finished_) { + return Status::Invalid("Restarted SourceNode '", label(), "'"); } + + auto gen = std::move(generator_); + + /// XXX should we wait on this future anywhere? In StopProducing() maybe? Review comment: Perhaps in `StopProducing()`? Then we append to the ExecNode contract that StopProducing must be called on a producing node before it is destroyed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org