pitrou commented on code in PR #13706:
URL: https://github.com/apache/arrow/pull/13706#discussion_r968544775
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- compute::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+ // TODO(ARROW-11841) check a StopToken to potentially cancel this plan
+
+ // If this is the first batch getting pulled, tell the exec plan to
+ // start producing
+ if (status_ == PLAN_NOT_STARTED) {
+ ARROW_RETURN_NOT_OK(StartProducing());
+ }
+
+ // If we've closed the reader, this is invalid
+ if (status_ == PLAN_FINISHED) {
+ return arrow::Status::Invalid("ExecPlanReader has been closed");
+ }
+
+ auto out = sink_gen_().result();
+ if (!out.ok()) {
+ StopProducing();
+ return out.status();
+ }
+
+ if (out.ValueUnsafe()) {
+ auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_,
gc_memory_pool());
+ if (!batch_result.ok()) {
+ StopProducing();
+ return batch_result.status();
+ }
+
+ *batch_out = batch_result.ValueUnsafe();
} else {
- MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
- compute::OrderBySinkNodeOptions{
- *std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices",
sort_options)),
- &sink_gen});
+ batch_out->reset();
+ StopProducing();
}
- } else {
- MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
- compute::SinkNodeOptions{&sink_gen});
+
+ return arrow::Status::OK();
}
- // End of chunk used in ExecPlan_BuildAndShow
+ arrow::Status Close() {
Review Comment:
```suggestion
arrow::Status Close() override {
```
##########
r/tests/testthat/test-query-engine.R:
##########
@@ -17,6 +17,76 @@
library(dplyr, warn.conflicts = FALSE)
+test_that("ExecPlanReader does not start evaluating a query", {
+ rbr <- as_record_batch_reader(
+ function(x) stop("This query will error if started"),
+ schema = schema(a = int32())
+ )
+
+ reader <- as_record_batch_reader(as_adq(rbr))
+ expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED")
+ expect_error(reader$read_table(), "This query will error if started")
+ expect_identical(reader$PlanStatus(), "PLAN_FINISHED")
+})
+
+test_that("ExecPlanReader evaluates nested exec plans lazily", {
+ reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10)))
+ expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED")
+
+ head_reader <- head(reader, 4)
+ expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED")
+
+ expect_equal(
+ head_reader$read_table(),
+ arrow_table(a = 1:4)
+ )
+
+ expect_identical(reader$PlanStatus(), "PLAN_FINISHED")
+})
+
+test_that("ExecPlanReader evaluates head() lazily", {
+ reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10)))
+ expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED")
+
+ head_reader <- head(reader, 4)
+ expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED")
+
+ expect_equal(
+ head_reader$read_table(),
+ arrow_table(a = 1:4)
+ )
+
+ expect_identical(reader$PlanStatus(), "PLAN_FINISHED")
+})
+
+test_that("ExecPlanReader evaluates head() lazily", {
+ # make a 500-row RecordBatchReader
+ reader <- RecordBatchReader$create(
+ batches = rep(
+ list(
+ record_batch(
+ line = c(
+ "this is the RecordBatchReader that never ends",
Review Comment:
I'm not sure I understand the comment. According to the `rep` doc, this will
actually be repeated a finite number of times? Is it possible to create an
infinite iterator in R?
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
Review Comment:
```suggestion
std::shared_ptr<arrow::Schema> schema() const override { return schema_; }
```
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- compute::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
Review Comment:
```suggestion
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out)
override {
```
##########
r/src/recordbatchreader.cpp:
##########
@@ -111,19 +116,77 @@ std::shared_ptr<arrow::Table>
Table__from_RecordBatchReader(
return ValueOrStop(reader->ToTable());
}
+// Because the head() operation can leave a RecordBatchReader whose contents
+// will never be drained, we implement a wrapper class here that takes care
+// to (1) return only the requested number of rows (or fewer) and (2) Close
+// and release the underlying reader as soon as possible. This is mostly
+// useful for the ExecPlanReader, whose Close() method also requests
+// that the ExecPlan stop producing, but may also be useful for readers
+// that point to an open file and whose Close() or delete method releases
+// the file.
+class RecordBatchReaderHead : public arrow::RecordBatchReader {
+ public:
+ RecordBatchReaderHead(std::shared_ptr<arrow::RecordBatchReader> reader,
+ int64_t num_rows)
+ : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {}
+
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
+
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+ if (!reader_) {
+ // Close() has been called
+ batch_out = nullptr;
+ return arrow::Status::OK();
+ }
+
+ ARROW_RETURN_NOT_OK(reader_->ReadNext(batch_out));
+ if (batch_out->get()) {
+ num_rows_ -= batch_out->get()->num_rows();
+ if (num_rows_ < 0) {
+ auto smaller_batch =
+ batch_out->get()->Slice(0, batch_out->get()->num_rows() +
num_rows_);
+ *batch_out = smaller_batch;
+ }
+
+ if (num_rows_ <= 0) {
+ // We've run out of num_rows before batches
+ ARROW_RETURN_NOT_OK(Close());
+ }
+ } else {
+ // We've run out of batches before num_rows
+ ARROW_RETURN_NOT_OK(Close());
+ }
+
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Close() {
Review Comment:
```suggestion
arrow::Status Close() override {
```
##########
r/src/recordbatchreader.cpp:
##########
@@ -111,19 +116,77 @@ std::shared_ptr<arrow::Table>
Table__from_RecordBatchReader(
return ValueOrStop(reader->ToTable());
}
+// Because the head() operation can leave a RecordBatchReader whose contents
+// will never be drained, we implement a wrapper class here that takes care
+// to (1) return only the requested number of rows (or fewer) and (2) Close
+// and release the underlying reader as soon as possible. This is mostly
+// useful for the ExecPlanReader, whose Close() method also requests
+// that the ExecPlan stop producing, but may also be useful for readers
+// that point to an open file and whose Close() or delete method releases
+// the file.
+class RecordBatchReaderHead : public arrow::RecordBatchReader {
+ public:
+ RecordBatchReaderHead(std::shared_ptr<arrow::RecordBatchReader> reader,
+ int64_t num_rows)
+ : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {}
+
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
+
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
Review Comment:
```suggestion
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out)
override {
```
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- compute::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+ // TODO(ARROW-11841) check a StopToken to potentially cancel this plan
+
+ // If this is the first batch getting pulled, tell the exec plan to
+ // start producing
+ if (status_ == PLAN_NOT_STARTED) {
+ ARROW_RETURN_NOT_OK(StartProducing());
+ }
+
+ // If we've closed the reader, this is invalid
+ if (status_ == PLAN_FINISHED) {
+ return arrow::Status::Invalid("ExecPlanReader has been closed");
+ }
+
+ auto out = sink_gen_().result();
+ if (!out.ok()) {
+ StopProducing();
+ return out.status();
+ }
+
+ if (out.ValueUnsafe()) {
+ auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_,
gc_memory_pool());
+ if (!batch_result.ok()) {
+ StopProducing();
+ return batch_result.status();
+ }
+
+ *batch_out = batch_result.ValueUnsafe();
} else {
- MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
- compute::OrderBySinkNodeOptions{
- *std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices",
sort_options)),
- &sink_gen});
+ batch_out->reset();
+ StopProducing();
}
- } else {
- MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
- compute::SinkNodeOptions{&sink_gen});
+
+ return arrow::Status::OK();
}
- // End of chunk used in ExecPlan_BuildAndShow
+ arrow::Status Close() {
+ StopProducing();
+ return arrow::Status::OK();
+ }
- StopIfNotOk(plan->Validate());
+ const std::shared_ptr<arrow::compute::ExecPlan>& Plan() { return plan_; }
- // If the generator is destroyed before being completely drained, inform plan
- std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
- bool not_finished_yet =
-
plan->finished().TryAddCallback([&plan] {
- return [plan](const
arrow::Status&) {};
- });
+ ~ExecPlanReader() { StopProducing(); }
- if (not_finished_yet) {
- plan->StopProducing();
- }
- }};
+ private:
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<arrow::compute::ExecPlan> plan_;
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen_;
+ int status_;
- // Attach metadata to the schema
- auto out_schema = final_node->output_schema();
- if (metadata.size() > 0) {
- auto kv = strings_to_kvm(metadata);
- out_schema = out_schema->WithMetadata(kv);
+ arrow::Status StartProducing() {
+ ARROW_RETURN_NOT_OK(plan_->StartProducing());
+ status_ = PLAN_RUNNING;
+ return arrow::Status::OK();
}
- std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
- out;
- out.first = plan;
- out.second = compute::MakeGeneratorReader(
- out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); },
- gc_memory_pool());
- return out;
-}
+ void StopProducing() {
+ if (status_ == PLAN_RUNNING) {
+ std::shared_ptr<arrow::compute::ExecPlan> plan(plan_);
+ bool not_finished_yet = plan_->finished().TryAddCallback(
+ [&plan] { return [plan](const arrow::Status&) {}; });
Review Comment:
Hmm, I don't understand: what is the point of the inner callback here? And
why capture `plan` by reference?
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
+ switch (status_) {
+ case PLAN_NOT_STARTED:
+ return "PLAN_NOT_STARTED";
+ case PLAN_RUNNING:
+ return "PLAN_RUNNING";
+ case PLAN_FINISHED:
+ return "PLAN_FINISHED";
+ default:
+ return "UNKNOWN";
+ }
+ }
- // For now, don't require R to construct SinkNodes.
- // Instead, just pass the node we should collect as an argument.
- arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
- // Sorting uses a different sink node; there is no general sort yet
- if (sort_options.size() > 0) {
- if (head >= 0) {
- // Use the SelectK node to take only what we need
- MakeExecNodeOrStop(
- "select_k_sink", plan.get(), {final_node.get()},
- compute::SelectKSinkNodeOptions{
- arrow::compute::SelectKOptions(
- head, std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices", sort_options))
- ->sort_keys),
- &sink_gen});
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+ // TODO(ARROW-11841) check a StopToken to potentially cancel this plan
+
+ // If this is the first batch getting pulled, tell the exec plan to
+ // start producing
+ if (status_ == PLAN_NOT_STARTED) {
+ ARROW_RETURN_NOT_OK(StartProducing());
+ }
+
+ // If we've closed the reader, this is invalid
+ if (status_ == PLAN_FINISHED) {
+ return arrow::Status::Invalid("ExecPlanReader has been closed");
+ }
+
+ auto out = sink_gen_().result();
+ if (!out.ok()) {
+ StopProducing();
+ return out.status();
+ }
+
+ if (out.ValueUnsafe()) {
+ auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_,
gc_memory_pool());
+ if (!batch_result.ok()) {
+ StopProducing();
+ return batch_result.status();
+ }
+
+ *batch_out = batch_result.ValueUnsafe();
} else {
- MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
- compute::OrderBySinkNodeOptions{
- *std::dynamic_pointer_cast<compute::SortOptions>(
- make_compute_options("sort_indices",
sort_options)),
- &sink_gen});
+ batch_out->reset();
+ StopProducing();
}
- } else {
- MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
- compute::SinkNodeOptions{&sink_gen});
+
+ return arrow::Status::OK();
}
- // End of chunk used in ExecPlan_BuildAndShow
+ arrow::Status Close() {
+ StopProducing();
+ return arrow::Status::OK();
+ }
- StopIfNotOk(plan->Validate());
+ const std::shared_ptr<arrow::compute::ExecPlan>& Plan() { return plan_; }
Review Comment:
```suggestion
const std::shared_ptr<arrow::compute::ExecPlan>& Plan() const { return
plan_; }
```
##########
r/src/recordbatchreader.cpp:
##########
@@ -111,19 +116,77 @@ std::shared_ptr<arrow::Table>
Table__from_RecordBatchReader(
return ValueOrStop(reader->ToTable());
}
+// Because the head() operation can leave a RecordBatchReader whose contents
+// will never be drained, we implement a wrapper class here that takes care
+// to (1) return only the requested number of rows (or fewer) and (2) Close
+// and release the underlying reader as soon as possible. This is mostly
+// useful for the ExecPlanReader, whose Close() method also requests
+// that the ExecPlan stop producing, but may also be useful for readers
+// that point to an open file and whose Close() or delete method releases
+// the file.
+class RecordBatchReaderHead : public arrow::RecordBatchReader {
+ public:
+ RecordBatchReaderHead(std::shared_ptr<arrow::RecordBatchReader> reader,
+ int64_t num_rows)
+ : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {}
+
+ std::shared_ptr<arrow::Schema> schema() const { return schema_; }
Review Comment:
```suggestion
std::shared_ptr<arrow::Schema> schema() const override { return schema_; }
```
##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
});
}
-std::pair<std::shared_ptr<compute::ExecPlan>,
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<compute::ExecNode>& final_node,
- cpp11::list sort_options, cpp11::strings metadata, int64_t
head = -1) {
- // a section of this code is copied and used in ExecPlan_BuildAndShow - the
2 need
- // to be in sync
- // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+ enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+ ExecPlanReader(
+ const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+ const std::shared_ptr<arrow::Schema>& schema,
+ arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>>
sink_gen)
+ : schema_(schema), plan_(plan), sink_gen_(sink_gen),
status_(PLAN_NOT_STARTED) {}
+
+ std::string PlanStatus() {
Review Comment:
Nit
```suggestion
std::string PlanStatus() const {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]