bkietz commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r644304045
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return
ToDerived(this)->Validate(); }
Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
- std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr
output_descr,
int num_outputs)
: plan_(plan),
label_(std::move(label)),
- input_descrs_(std::move(input_descrs)),
+ inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_descr_(std::move(output_descr)),
- num_outputs_(num_outputs) {}
+ num_outputs_(num_outputs) {
+ for (auto input : inputs_) {
+ input->outputs_.push_back(this);
+ }
+}
Status ExecNode::Validate() const {
- if (inputs_.size() != input_descrs_.size()) {
+ if (inputs_.size() != input_labels_.size()) {
return Status::Invalid("Invalid number of inputs for '", label(), "'
(expected ",
- num_inputs(), ", actual ", inputs_.size(), ")");
+ num_inputs(), ", actual ", input_labels_.size(),
")");
}
if (static_cast<int>(outputs_.size()) != num_outputs_) {
return Status::Invalid("Invalid number of outputs for '", label(), "'
(expected ",
num_outputs(), ", actual ", outputs_.size(), ")");
}
- DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
for (auto out : outputs_) {
auto input_index = GetNodeIndex(out->inputs(), this);
if (!input_index) {
return Status::Invalid("Node '", label(), "' outputs to node '",
out->label(),
"' but is not listed as an input.");
}
+ }
- const auto& in_descr = out->input_descrs_[*input_index];
- if (in_descr != output_descr_) {
- return Status::Invalid(
- "Node '", label(), "' (bound to input ", input_labels_[*input_index],
- ") produces batches with type '",
ValueDescr::ToString(output_descr_),
- "' inconsistent with consumer '", out->label(), "' which accepts '",
- ValueDescr::ToString(in_descr), "'");
+ return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+ SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator)
+ : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+ /*num_outputs=*/1),
+ generator_(std::move(generator)) {}
+
+ const char* kind_name() override { return "SourceNode"; }
+
+ static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
+ void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+ void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+ Status StartProducing() override {
+ if (finished_) {
+ return Status::Invalid("Restarted SourceNode '", label(), "'");
}
+
+ auto gen = std::move(generator_);
+
+ /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+ auto done_fut =
+ Loop([gen, this] {
Review comment:
The alternative is something like
[GenerateOne](https://github.com/apache/arrow/pull/10397/commits/4bb7404b0101a3c4ebb6a929f28229da44b6bcc5#diff-f001186af47c4e8a2d4ca433e9eec579745bbd5e0bbb42fdafb970b625a7ed20L238),
which is just as bad or worse IMO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]