pitrou commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r643987448
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -25,6 +25,7 @@
#include <utility>
#include <vector>
+#include "arrow/compute/exec/exec_plan.h"
Review comment:
Do we have a `type_fwd.h` for this instead? Only pointers are exposed in
this header.
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -937,4 +940,19 @@ Future<BreakValueType> Loop(Iterate iterate) {
return break_fut;
}
+template <typename T>
+struct EnsureFuture {
+ using type = Future<T>;
+};
+
+template <typename T>
+struct EnsureFuture<Result<T>> {
Review comment:
Should there also be `EnsureFuture<Status>`?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -23,6 +23,7 @@
#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"
Review comment:
I would prefer if we didn't include this large header file here. We can
use the `std::function<Future<T>()>` spelling directly instead.
##########
File path: cpp/src/arrow/compute/exec.h
##########
@@ -28,6 +28,7 @@
#include <vector>
#include "arrow/array/data.h"
+#include "arrow/compute/exec/expression.h"
Review comment:
Usual concern: is there a way to avoid including too much (I see this
pulls `unordered_map`, for example)?
##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -66,6 +66,12 @@ bool IsIterationEnd(const T& val) {
return IterationTraits<T>::IsEnd(val);
}
+template <typename T>
+bool IsIterationEnd(const Result<T>& maybe_val) {
+ if (!maybe_val.ok()) return true;
Review comment:
Are we sure a non-ok status is an end of iteration? I don't think it's
the case.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -258,43 +258,27 @@ class MappingGenerator {
/// Note: Errors returned from the `map` function will be propagated
///
/// If the source generator is async-reentrant then this generator will be also
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
- std::function<Result<V>(const T&)> map) {
- std::function<Future<V>(const T&)> future_map = [map](const T& val) ->
Future<V> {
- return Future<V>::MakeFinished(map(val));
- };
- return MappingGenerator<T, V>(std::move(source_generator),
std::move(future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
- std::function<V(const T&)> map) {
- std::function<Future<V>(const T&)> maybe_future_map = [map](const T& val) ->
Future<V> {
- return Future<V>::MakeFinished(map(val));
- };
- return MappingGenerator<T, V>(std::move(source_generator),
std::move(maybe_future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
- std::function<Future<V>(const T&)> map) {
- return MappingGenerator<T, V>(std::move(source_generator), std::move(map));
-}
-
-template <typename V, typename T, typename MapFunc>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
MapFunc map) {
+template <typename T, typename MapFn,
+ typename Mapped = detail::result_of_t<MapFn(const T&)>,
+ typename V = typename EnsureFuture<Mapped>::type::ValueType>
+AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
MapFn map) {
struct MapCallback {
- MapFunc map;
+ MapFn map_;
- Future<V> operator()(const T& val) { return EnsureFuture(map(val)); }
+ Future<V> operator()(const T& val) { return EnsureFuture(map_(val)); }
+
+ Future<V> EnsureFuture(V mapped) {
+ return Future<V>::MakeFinished(std::move(mapped));
+ }
- Future<V> EnsureFuture(Result<V> val) {
- return Future<V>::MakeFinished(std::move(val));
+ Future<V> EnsureFuture(Result<V> mapped) {
+ return Future<V>::MakeFinished(std::move(mapped));
}
- Future<V> EnsureFuture(V val) { return
Future<V>::MakeFinished(std::move(val)); }
- Future<V> EnsureFuture(Future<V> val) { return val; }
+
+ Future<V> EnsureFuture(Future<V> mapped) { return mapped; }
Review comment:
Is this related to the `EnsureFuture` struct in `future.h`? Should it be
moved there?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -225,22 +212,43 @@ class ARROW_EXPORT ExecNode {
virtual void StopProducing() = 0;
protected:
- ExecNode(ExecPlan* plan, std::string label, std::vector<BatchDescr>
input_descrs,
+ ExecNode(ExecPlan*, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr output_descr,
int num_outputs);
ExecPlan* plan_;
-
std::string label_;
- std::vector<BatchDescr> input_descrs_;
- std::vector<std::string> input_labels_;
NodeVector inputs_;
+ std::vector<std::string> input_labels_;
BatchDescr output_descr_;
int num_outputs_;
NodeVector outputs_;
};
+/// \brief Adapt an AsyncGenerator<ExecBatch> as a source node
+ARROW_EXPORT
+ExecNode* MakeSourceNode(ExecPlan*, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>>);
Review comment:
I wonder if we should move the concrete ExecNode implementations to a
separate file. `exec_plan.h` would only expose the base API. What do you think?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return
ToDerived(this)->Validate(); }
Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
- std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr
output_descr,
int num_outputs)
: plan_(plan),
label_(std::move(label)),
- input_descrs_(std::move(input_descrs)),
+ inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_descr_(std::move(output_descr)),
- num_outputs_(num_outputs) {}
+ num_outputs_(num_outputs) {
+ for (auto input : inputs_) {
+ input->outputs_.push_back(this);
+ }
+}
Status ExecNode::Validate() const {
- if (inputs_.size() != input_descrs_.size()) {
+ if (inputs_.size() != input_labels_.size()) {
return Status::Invalid("Invalid number of inputs for '", label(), "'
(expected ",
- num_inputs(), ", actual ", inputs_.size(), ")");
+ num_inputs(), ", actual ", input_labels_.size(),
")");
}
if (static_cast<int>(outputs_.size()) != num_outputs_) {
return Status::Invalid("Invalid number of outputs for '", label(), "'
(expected ",
num_outputs(), ", actual ", outputs_.size(), ")");
}
- DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
for (auto out : outputs_) {
auto input_index = GetNodeIndex(out->inputs(), this);
if (!input_index) {
return Status::Invalid("Node '", label(), "' outputs to node '",
out->label(),
"' but is not listed as an input.");
}
+ }
- const auto& in_descr = out->input_descrs_[*input_index];
- if (in_descr != output_descr_) {
- return Status::Invalid(
- "Node '", label(), "' (bound to input ", input_labels_[*input_index],
- ") produces batches with type '",
ValueDescr::ToString(output_descr_),
- "' inconsistent with consumer '", out->label(), "' which accepts '",
- ValueDescr::ToString(in_descr), "'");
+ return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+ SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator)
+ : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+ /*num_outputs=*/1),
+ generator_(std::move(generator)) {}
+
+ const char* kind_name() override { return "SourceNode"; }
+
+ static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
+ void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+ void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+ Status StartProducing() override {
+ if (finished_) {
+ return Status::Invalid("Restarted SourceNode '", label(), "'");
}
+
+ auto gen = std::move(generator_);
+
+ /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+ auto done_fut =
+ Loop([gen, this] {
Review comment:
Is the `Loop` construct necessary? I find that it produces unreadable
code.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return
ToDerived(this)->Validate(); }
Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
- std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr
output_descr,
int num_outputs)
: plan_(plan),
label_(std::move(label)),
- input_descrs_(std::move(input_descrs)),
+ inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_descr_(std::move(output_descr)),
- num_outputs_(num_outputs) {}
+ num_outputs_(num_outputs) {
+ for (auto input : inputs_) {
+ input->outputs_.push_back(this);
+ }
+}
Status ExecNode::Validate() const {
- if (inputs_.size() != input_descrs_.size()) {
+ if (inputs_.size() != input_labels_.size()) {
return Status::Invalid("Invalid number of inputs for '", label(), "'
(expected ",
- num_inputs(), ", actual ", inputs_.size(), ")");
+ num_inputs(), ", actual ", input_labels_.size(),
")");
}
if (static_cast<int>(outputs_.size()) != num_outputs_) {
return Status::Invalid("Invalid number of outputs for '", label(), "'
(expected ",
num_outputs(), ", actual ", outputs_.size(), ")");
}
- DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
for (auto out : outputs_) {
auto input_index = GetNodeIndex(out->inputs(), this);
if (!input_index) {
return Status::Invalid("Node '", label(), "' outputs to node '",
out->label(),
"' but is not listed as an input.");
}
+ }
- const auto& in_descr = out->input_descrs_[*input_index];
- if (in_descr != output_descr_) {
- return Status::Invalid(
- "Node '", label(), "' (bound to input ", input_labels_[*input_index],
- ") produces batches with type '",
ValueDescr::ToString(output_descr_),
- "' inconsistent with consumer '", out->label(), "' which accepts '",
- ValueDescr::ToString(in_descr), "'");
+ return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+ SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator)
+ : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+ /*num_outputs=*/1),
+ generator_(std::move(generator)) {}
+
+ const char* kind_name() override { return "SourceNode"; }
+
+ static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
+ void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+ void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+ Status StartProducing() override {
+ if (finished_) {
+ return Status::Invalid("Restarted SourceNode '", label(), "'");
}
+
+ auto gen = std::move(generator_);
+
+ /// XXX should we wait on this future anywhere? In StopProducing() maybe?
Review comment:
Perhaps in `~SourceNode()`... but destructors may not be called in
deterministic order, so perhaps we need a `Close` method instead that would be
called in order by `~ExecPlan()`.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -105,18 +107,11 @@ struct ExecPlanImpl : public ExecPlan {
return Status::OK();
}
- auto it_success = visiting.insert(node);
- if (!it_success.second) {
- // Insertion failed => node is already being visited
- return Status::Invalid("Cycle detected in execution plan");
- }
Review comment:
Why did you remove this?
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return
ToDerived(this)->Validate(); }
Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
- std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr
output_descr,
int num_outputs)
: plan_(plan),
label_(std::move(label)),
- input_descrs_(std::move(input_descrs)),
+ inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_descr_(std::move(output_descr)),
- num_outputs_(num_outputs) {}
+ num_outputs_(num_outputs) {
+ for (auto input : inputs_) {
+ input->outputs_.push_back(this);
+ }
+}
Status ExecNode::Validate() const {
- if (inputs_.size() != input_descrs_.size()) {
+ if (inputs_.size() != input_labels_.size()) {
return Status::Invalid("Invalid number of inputs for '", label(), "'
(expected ",
- num_inputs(), ", actual ", inputs_.size(), ")");
+ num_inputs(), ", actual ", input_labels_.size(),
")");
}
if (static_cast<int>(outputs_.size()) != num_outputs_) {
return Status::Invalid("Invalid number of outputs for '", label(), "'
(expected ",
num_outputs(), ", actual ", outputs_.size(), ")");
}
- DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
for (auto out : outputs_) {
auto input_index = GetNodeIndex(out->inputs(), this);
if (!input_index) {
return Status::Invalid("Node '", label(), "' outputs to node '",
out->label(),
"' but is not listed as an input.");
}
+ }
- const auto& in_descr = out->input_descrs_[*input_index];
- if (in_descr != output_descr_) {
- return Status::Invalid(
- "Node '", label(), "' (bound to input ", input_labels_[*input_index],
- ") produces batches with type '",
ValueDescr::ToString(output_descr_),
- "' inconsistent with consumer '", out->label(), "' which accepts '",
- ValueDescr::ToString(in_descr), "'");
+ return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+ SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator)
+ : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+ /*num_outputs=*/1),
+ generator_(std::move(generator)) {}
+
+ const char* kind_name() override { return "SourceNode"; }
+
+ static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
+ void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+ void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+ Status StartProducing() override {
+ if (finished_) {
+ return Status::Invalid("Restarted SourceNode '", label(), "'");
}
+
+ auto gen = std::move(generator_);
+
+ /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+ auto done_fut =
+ Loop([gen, this] {
+ std::unique_lock<std::mutex> lock(mutex_);
+ int seq = next_batch_index_++;
+ if (finished_) {
+ return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+ }
+ lock.unlock();
+
+ return gen().Then(
+ [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (!batch || finished_) {
+ finished_ = true;
+ return Break(seq);
+ }
+ lock.unlock();
+
+ outputs_[0]->InputReceived(this, seq, *batch);
+ return Continue();
+ },
+ [=](const Status& error) -> ControlFlow<int> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (!finished_) {
+ finished_ = true;
+ lock.unlock();
+ // unless we were already finished, push the error to our
output
+ // XXX is this correct? Is it reasonable for a consumer to
ignore errors
+ // from a finished producer?
+ outputs_[0]->ErrorReceived(this, error);
+ }
+ return Break(seq);
+ });
+ }).Then([&](int seq) {
+ /// XXX this is probably redundant: do we always call InputFinished
after
+ /// ErrorReceived or will ErrorRecieved be sufficient?
+ outputs_[0]->InputFinished(this, seq);
+ });
+
+ return Status::OK();
}
- return Status::OK();
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
Review comment:
The problem is that we'll want to be able to apply backpressure at some
point. But a generator doesn't allow for that. So it seems that, instead of
wrapping a generator, you should really have a `ExecNode` that wraps a dataset
scanner directly.
##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -170,48 +165,409 @@ Status ExecPlan::Validate() { return
ToDerived(this)->Validate(); }
Status ExecPlan::StartProducing() { return ToDerived(this)->StartProducing(); }
-ExecNode::ExecNode(ExecPlan* plan, std::string label,
- std::vector<BatchDescr> input_descrs,
+ExecNode::ExecNode(ExecPlan* plan, std::string label, NodeVector inputs,
std::vector<std::string> input_labels, BatchDescr
output_descr,
int num_outputs)
: plan_(plan),
label_(std::move(label)),
- input_descrs_(std::move(input_descrs)),
+ inputs_(std::move(inputs)),
input_labels_(std::move(input_labels)),
output_descr_(std::move(output_descr)),
- num_outputs_(num_outputs) {}
+ num_outputs_(num_outputs) {
+ for (auto input : inputs_) {
+ input->outputs_.push_back(this);
+ }
+}
Status ExecNode::Validate() const {
- if (inputs_.size() != input_descrs_.size()) {
+ if (inputs_.size() != input_labels_.size()) {
return Status::Invalid("Invalid number of inputs for '", label(), "'
(expected ",
- num_inputs(), ", actual ", inputs_.size(), ")");
+ num_inputs(), ", actual ", input_labels_.size(),
")");
}
if (static_cast<int>(outputs_.size()) != num_outputs_) {
return Status::Invalid("Invalid number of outputs for '", label(), "'
(expected ",
num_outputs(), ", actual ", outputs_.size(), ")");
}
- DCHECK_EQ(input_descrs_.size(), input_labels_.size());
-
for (auto out : outputs_) {
auto input_index = GetNodeIndex(out->inputs(), this);
if (!input_index) {
return Status::Invalid("Node '", label(), "' outputs to node '",
out->label(),
"' but is not listed as an input.");
}
+ }
- const auto& in_descr = out->input_descrs_[*input_index];
- if (in_descr != output_descr_) {
- return Status::Invalid(
- "Node '", label(), "' (bound to input ", input_labels_[*input_index],
- ") produces batches with type '",
ValueDescr::ToString(output_descr_),
- "' inconsistent with consumer '", out->label(), "' which accepts '",
- ValueDescr::ToString(in_descr), "'");
+ return Status::OK();
+}
+
+struct SourceNode : ExecNode {
+ SourceNode(ExecPlan* plan, std::string label, ExecNode::BatchDescr
output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator)
+ : ExecNode(plan, std::move(label), {}, {}, std::move(output_descr),
+ /*num_outputs=*/1),
+ generator_(std::move(generator)) {}
+
+ const char* kind_name() override { return "SourceNode"; }
+
+ static void NoInputs() { DCHECK(false) << "no inputs; this should never be
called"; }
+ void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
+ void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+ void InputFinished(ExecNode*, int) override { NoInputs(); }
+
+ Status StartProducing() override {
+ if (finished_) {
+ return Status::Invalid("Restarted SourceNode '", label(), "'");
}
+
+ auto gen = std::move(generator_);
+
+ /// XXX should we wait on this future anywhere? In StopProducing() maybe?
+ auto done_fut =
+ Loop([gen, this] {
+ std::unique_lock<std::mutex> lock(mutex_);
+ int seq = next_batch_index_++;
+ if (finished_) {
+ return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+ }
+ lock.unlock();
+
+ return gen().Then(
+ [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (!batch || finished_) {
+ finished_ = true;
+ return Break(seq);
+ }
+ lock.unlock();
+
+ outputs_[0]->InputReceived(this, seq, *batch);
+ return Continue();
+ },
+ [=](const Status& error) -> ControlFlow<int> {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (!finished_) {
+ finished_ = true;
+ lock.unlock();
+ // unless we were already finished, push the error to our
output
+ // XXX is this correct? Is it reasonable for a consumer to
ignore errors
+ // from a finished producer?
+ outputs_[0]->ErrorReceived(this, error);
+ }
+ return Break(seq);
+ });
+ }).Then([&](int seq) {
+ /// XXX this is probably redundant: do we always call InputFinished
after
+ /// ErrorReceived or will ErrorRecieved be sufficient?
+ outputs_[0]->InputFinished(this, seq);
+ });
+
+ return Status::OK();
}
- return Status::OK();
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ std::unique_lock<std::mutex> lock(mutex_);
+ finished_ = true;
+ }
+
+ void StopProducing() override { StopProducing(outputs_[0]); }
+
+ private:
+ std::mutex mutex_;
+ bool finished_{false};
+ int next_batch_index_{0};
+ AsyncGenerator<util::optional<ExecBatch>> generator_;
+};
+
+ExecNode* MakeSourceNode(ExecPlan* plan, std::string label,
+ ExecNode::BatchDescr output_descr,
+ AsyncGenerator<util::optional<ExecBatch>> generator) {
+ return plan->EmplaceNode<SourceNode>(plan, std::move(label),
std::move(output_descr),
+ std::move(generator));
+}
+
+struct FilterNode : ExecNode {
+ FilterNode(ExecNode* input, std::string label, Expression filter)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_descr=*/{input->output_descr()},
+ /*num_outputs=*/1),
+ filter_(std::move(filter)) {}
+
+ const char* kind_name() override { return "FilterNode"; }
+
+ Result<ExecBatch> DoFilter(const ExecBatch& target) {
+ ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
+ SimplifyWithGuarantee(filter_, target.guarantee));
+
+ // XXX get a non-default exec context
+ ARROW_ASSIGN_OR_RAISE(Datum mask,
ExecuteScalarExpression(simplified_filter, target));
+
+ if (mask.is_scalar()) {
+ const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+ if (mask_scalar.is_valid && mask_scalar.value) {
+ return target;
+ }
+
+ return target.Slice(0, 0);
+ }
+
+ auto values = target.values;
+ for (auto& value : values) {
+ if (value.is_scalar()) continue;
+ ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask,
FilterOptions::Defaults()));
+ }
+ return ExecBatch::Make(std::move(values));
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ auto maybe_filtered = DoFilter(std::move(batch));
+ if (!maybe_filtered.ok()) {
+ outputs_[0]->ErrorReceived(this, maybe_filtered.status());
+ inputs_[0]->StopProducing(this);
+ return;
+ }
+
+ maybe_filtered->guarantee = batch.guarantee;
+ outputs_[0]->InputReceived(this, seq, maybe_filtered.MoveValueUnsafe());
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ inputs_[0]->StopProducing(this);
+ }
+
+ void InputFinished(ExecNode* input, int seq) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->InputFinished(this, seq);
+ inputs_[0]->StopProducing(this);
+ }
+
+ Status StartProducing() override {
+ // XXX validate inputs_[0]->output_descr() against filter_
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ inputs_[0]->StopProducing(this);
+ }
+
+ void StopProducing() override { StopProducing(outputs_[0]); }
+
+ private:
+ Expression filter_;
+};
+
+ExecNode* MakeFilterNode(ExecNode* input, std::string label, Expression
filter) {
+ return input->plan()->EmplaceNode<FilterNode>(input, std::move(label),
+ std::move(filter));
+}
+
+struct ProjectNode : ExecNode {
+ ProjectNode(ExecNode* input, std::string label, std::vector<Expression>
exprs)
+ : ExecNode(input->plan(), std::move(label), {input}, {"target"},
+ /*output_descr=*/{input->output_descr()},
+ /*num_outputs=*/1),
+ exprs_(std::move(exprs)) {}
+
+ const char* kind_name() override { return "ProjectNode"; }
+
+ Result<ExecBatch> DoProject(const ExecBatch& target) {
+ // XXX get a non-default exec context
+ std::vector<Datum> values{exprs_.size()};
+ for (size_t i = 0; i < exprs_.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(Expression simplified_expr,
+ SimplifyWithGuarantee(exprs_[i],
target.guarantee));
+
+ ARROW_ASSIGN_OR_RAISE(values[i],
ExecuteScalarExpression(simplified_expr, target));
+ }
+ return ExecBatch::Make(std::move(values));
+ }
+
+ void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+ DCHECK_EQ(input, inputs_[0]);
+
+ auto maybe_projected = DoProject(std::move(batch));
+ if (!maybe_projected.ok()) {
+ outputs_[0]->ErrorReceived(this, maybe_projected.status());
+ inputs_[0]->StopProducing(this);
+ return;
+ }
+
+ maybe_projected->guarantee = batch.guarantee;
+ outputs_[0]->InputReceived(this, seq, maybe_projected.MoveValueUnsafe());
+ }
+
+ void ErrorReceived(ExecNode* input, Status error) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->ErrorReceived(this, std::move(error));
+ inputs_[0]->StopProducing(this);
+ }
+
+ void InputFinished(ExecNode* input, int seq) override {
+ DCHECK_EQ(input, inputs_[0]);
+ outputs_[0]->InputFinished(this, seq);
+ inputs_[0]->StopProducing(this);
+ }
+
+ Status StartProducing() override {
+ // XXX validate inputs_[0]->output_descr() against filter_
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output) override {}
+
+ void ResumeProducing(ExecNode* output) override {}
+
+ void StopProducing(ExecNode* output) override {
+ DCHECK_EQ(output, outputs_[0]);
+ inputs_[0]->StopProducing(this);
+ }
+
+ void StopProducing() override { StopProducing(outputs_[0]); }
+
+ private:
+ std::vector<Expression> exprs_;
+};
+
+ExecNode* MakeProjectNode(ExecNode* input, std::string label,
+ std::vector<Expression> exprs) {
+ return input->plan()->EmplaceNode<ProjectNode>(input, std::move(label),
+ std::move(exprs));
+}
+
+struct SinkNode : ExecNode {
+ SinkNode(ExecNode* input, std::string label,
+ AsyncGenerator<util::optional<ExecBatch>>* generator)
+ : ExecNode(input->plan(), std::move(label), {input}, {"collected"}, {},
+ /*num_outputs=*/0),
+ producer_(MakeProducer(generator)) {}
+
+ static PushGenerator<util::optional<ExecBatch>>::Producer MakeProducer(
+ AsyncGenerator<util::optional<ExecBatch>>* out_gen) {
+ PushGenerator<util::optional<ExecBatch>> gen;
+ auto out = gen.producer();
+ *out_gen = std::move(gen);
+ return out;
+ }
+
+ const char* kind_name() override { return "SinkNode"; }
+
+ Status StartProducing() override { return Status::OK(); }
+
+ // sink nodes have no outputs from which to feel backpressure
+ static void NoOutputs() { DCHECK(false) << "no outputs; this should never be
called"; }
+ void ResumeProducing(ExecNode* output) override { NoOutputs(); }
+ void PauseProducing(ExecNode* output) override { NoOutputs(); }
+ void StopProducing(ExecNode* output) override { NoOutputs(); }
+
+ void StopProducing() override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ StopProducingUnlocked();
+ }
+
+ void InputReceived(ExecNode* input, int seq_num, ExecBatch exec_batch)
override {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (stopped_) return;
+
+ // TODO would be nice to factor this out in a ReorderQueue
+ if (seq_num <= static_cast<int>(received_batches_.size())) {
+ received_batches_.resize(seq_num + 1);
+ emitted_.resize(seq_num + 1, false);
+ }
+ received_batches_[seq_num] = std::move(exec_batch);
+ ++num_received_;
+
+ if (seq_num != num_emitted_) {
+ // Cannot emit yet as there is a hole at `num_emitted_`
+ DCHECK_GT(seq_num, num_emitted_);
+ return;
+ }
+
+ if (num_received_ == emit_stop_) {
+ StopProducingUnlocked();
+ }
+
+ // Emit batches in order as far as possible
+ // First collect these batches, then unlock before producing.
+ const auto seq_start = seq_num;
+ while (seq_num < static_cast<int>(emitted_.size()) && !emitted_[seq_num]) {
+ emitted_[seq_num] = true;
+ ++seq_num;
+ }
+ DCHECK_GT(seq_num, seq_start);
+ // By moving the values now, we make sure another thread won't emit the
same values
+ // below
+ std::vector<ExecBatch> to_emit(
+ std::make_move_iterator(received_batches_.begin() + seq_start),
+ std::make_move_iterator(received_batches_.begin() + seq_num));
+
+ lock.unlock();
+ for (auto&& batch : to_emit) {
+ producer_.Push(std::move(batch));
+ }
+ lock.lock();
+
+ DCHECK_EQ(seq_start, num_emitted_); // num_emitted_ wasn't bumped in the
meantime
+ num_emitted_ = seq_num;
Review comment:
@westonpace If another thread comes here, it will have a different
`seq_num` and will hit the `seq_num != num_emitted_` condition above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]