paleolimbot commented on code in PR #13706:
URL: https://github.com/apache/arrow/pull/13706#discussion_r971430832
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- compute::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+ // TODO(ARROW-11841) check a StopToken to potentially cancel this plan
+
+ // If this is the first batch getting pulled, tell the exec plan to
+ // start producing
+ if (status_ == PLAN_NOT_STARTED) {
+ ARROW_RETURN_NOT_OK(StartProducing());
+ }
+
+ // If we've closed the reader, this is invalid
+ if (status_ == PLAN_FINISHED) {
+ return arrow::Status::Invalid("ExecPlanReader has been closed");
+ }
+
+ auto out = sink_gen_().result();
+ if (!out.ok()) {
+ StopProducing();
+ return out.status();
+ }
+
+ if (out.ValueUnsafe()) {
+ auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_,
gc_memory_pool());
+ if (!batch_result.ok()) {
+ StopProducing();
+ return batch_result.status();
+ }
+
+ *batch_out = batch_result.ValueUnsafe();
} else {
- MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
- compute::OrderBySinkNodeOptions{
- *std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices",
sort_options)),
- &sink_gen});
+ batch_out->reset();
+ StopProducing();
}
- } else {
- MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
- compute::SinkNodeOptions{&sink_gen});
+
+ return arrow::Status::OK();
}
- // End of chunk used in ExecPlan_BuildAndShow
+ arrow::Status Close() {
+ StopProducing();
+ return arrow::Status::OK();
+ }
- StopIfNotOk(plan->Validate());
+ const std::shared_ptr<arrow::compute::ExecPlan>& Plan() { return plan_; }
- // If the generator is destroyed before being completely drained, inform plan
- std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
- bool not_finished_yet =
-
plan->finished().TryAddCallback([&plan] {
- return [plan](const
arrow::Status&) {};
- });
+ ~ExecPlanReader() { StopProducing(); }
- if (not_finished_yet) {
- plan->StopProducing();
- }
- }};
+ private:
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<arrow::compute::ExecPlan> plan_;
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen_;
+ int status_;
- // Attach metadata to the schema
- auto out_schema = final_node->output_schema();
- if (metadata.size() > 0) {
- auto kv = strings_to_kvm(metadata);
- out_schema = out_schema->WithMetadata(kv);
+ arrow::Status StartProducing() {
+ ARROW_RETURN_NOT_OK(plan_->StartProducing());
+ status_ = PLAN_RUNNING;
+ return arrow::Status::OK();
}
- std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
- out;
- out.first = plan;
- out.second = compute::MakeGeneratorReader(
- out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); },
- gc_memory_pool());
- return out;
-}
+ void StopProducing() {
+ if (status_ == PLAN_RUNNING) {
+ std::shared_ptr<arrow::compute::ExecPlan> plan(plan_);
+ bool not_finished_yet = plan_->finished().TryAddCallback(
+ [&plan] { return [plan](const arrow::Status&) {}; });
Review Comment:
Ok - the CI reminded me why I keep adding this back in after trying to
simplify it (
https://github.com/apache/arrow/actions/runs/3055062036/jobs/4927693653#step:9:8990
). I added a comment here to remind the next person who wants to simplify it,
too.
The point of this method is to call `plan_->StopProducing()` and give up our
reference to it, but because it might not be safe to delete it yet, we need
something to keep it alive until that happens. I think the outer `plan` was
captured by reference because the inner one is captured by value (maybe so that
only one extra reference gets added to the shared pointer).
I would love any leads on a better way to do this!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]