paleolimbot commented on code in PR #13706:
URL: https://github.com/apache/arrow/pull/13706#discussion_r968577939


##########
r/src/compute-exec.cpp:
##########
@@ -56,118 +56,151 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
       });
 }
 
-std::pair<std::shared_ptr<compute::ExecPlan>, 
std::shared_ptr<arrow::RecordBatchReader>>
-ExecPlan_prepare(const std::shared_ptr<compute::ExecPlan>& plan,
-                 const std::shared_ptr<compute::ExecNode>& final_node,
-                 cpp11::list sort_options, cpp11::strings metadata, int64_t 
head = -1) {
-  // a section of this code is copied and used in ExecPlan_BuildAndShow - the 
2 need
-  // to be in sync
-  // Start of chunk used in ExecPlan_BuildAndShow
+// This class is a special RecordBatchReader that holds a reference to the
+// underlying exec plan so that (1) it can request that the ExecPlan *stop*
+// producing when this object is deleted and (2) it can defer requesting
+// the ExecPlan to *start* producing until the first batch has been pulled.
+// This allows it to be transformed (e.g., using map_batches() or head())
+// and queried (i.e., used as input to another ExecPlan), at the R level
+// while maintaining the ability for the entire plan to be executed at once
+// (e.g., to support user-defined functions) or never executed at all (e.g.,
+// to support printing a nested ExecPlan without having to execute it).
+class ExecPlanReader : public arrow::RecordBatchReader {
+ public:
+  enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED };
+
+  ExecPlanReader(
+      const std::shared_ptr<arrow::compute::ExecPlan>& plan,
+      const std::shared_ptr<arrow::Schema>& schema,
+      arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> 
sink_gen)
+      : schema_(schema), plan_(plan), sink_gen_(sink_gen), 
status_(PLAN_NOT_STARTED) {}
+
+  std::string PlanStatus() {
+    switch (status_) {
+      case PLAN_NOT_STARTED:
+        return "PLAN_NOT_STARTED";
+      case PLAN_RUNNING:
+        return "PLAN_RUNNING";
+      case PLAN_FINISHED:
+        return "PLAN_FINISHED";
+      default:
+        return "UNKNOWN";
+    }
+  }
 
-  // For now, don't require R to construct SinkNodes.
-  // Instead, just pass the node we should collect as an argument.
-  arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;
+  std::shared_ptr<arrow::Schema> schema() const { return schema_; }
 
-  // Sorting uses a different sink node; there is no general sort yet
-  if (sort_options.size() > 0) {
-    if (head >= 0) {
-      // Use the SelectK node to take only what we need
-      MakeExecNodeOrStop(
-          "select_k_sink", plan.get(), {final_node.get()},
-          compute::SelectKSinkNodeOptions{
-              arrow::compute::SelectKOptions(
-                  head, std::dynamic_pointer_cast<compute::SortOptions>(
-                            make_compute_options("sort_indices", sort_options))
-                            ->sort_keys),
-              &sink_gen});
+  arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) {
+    // TODO(ARROW-11841) check a StopToken to potentially cancel this plan
+
+    // If this is the first batch getting pulled, tell the exec plan to
+    // start producing
+    if (status_ == PLAN_NOT_STARTED) {
+      ARROW_RETURN_NOT_OK(StartProducing());
+    }
+
+    // If we've closed the reader, this is invalid
+    if (status_ == PLAN_FINISHED) {
+      return arrow::Status::Invalid("ExecPlanReader has been closed");
+    }
+
+    auto out = sink_gen_().result();
+    if (!out.ok()) {
+      StopProducing();
+      return out.status();
+    }
+
+    if (out.ValueUnsafe()) {
+      auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_, 
gc_memory_pool());
+      if (!batch_result.ok()) {
+        StopProducing();
+        return batch_result.status();
+      }
+
+      *batch_out = batch_result.ValueUnsafe();
     } else {
-      MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
-                         compute::OrderBySinkNodeOptions{
-                             *std::dynamic_pointer_cast<compute::SortOptions>(
-                                 make_compute_options("sort_indices", 
sort_options)),
-                             &sink_gen});
+      batch_out->reset();
+      StopProducing();
     }
-  } else {
-    MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
-                       compute::SinkNodeOptions{&sink_gen});
+
+    return arrow::Status::OK();
   }
 
-  // End of chunk used in ExecPlan_BuildAndShow
+  arrow::Status Close() {
+    StopProducing();
+    return arrow::Status::OK();
+  }
 
-  StopIfNotOk(plan->Validate());
+  const std::shared_ptr<arrow::compute::ExecPlan>& Plan() { return plan_; }
 
-  // If the generator is destroyed before being completely drained, inform plan
-  std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
-                                         bool not_finished_yet =
-                                             
plan->finished().TryAddCallback([&plan] {
-                                               return [plan](const 
arrow::Status&) {};
-                                             });
+  ~ExecPlanReader() { StopProducing(); }
 
-                                         if (not_finished_yet) {
-                                           plan->StopProducing();
-                                         }
-                                       }};
+ private:
+  std::shared_ptr<arrow::Schema> schema_;
+  std::shared_ptr<arrow::compute::ExecPlan> plan_;
+  arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen_;
+  int status_;
 
-  // Attach metadata to the schema
-  auto out_schema = final_node->output_schema();
-  if (metadata.size() > 0) {
-    auto kv = strings_to_kvm(metadata);
-    out_schema = out_schema->WithMetadata(kv);
+  arrow::Status StartProducing() {
+    ARROW_RETURN_NOT_OK(plan_->StartProducing());
+    status_ = PLAN_RUNNING;
+    return arrow::Status::OK();
   }
 
-  std::pair<std::shared_ptr<compute::ExecPlan>, 
std::shared_ptr<arrow::RecordBatchReader>>
-      out;
-  out.first = plan;
-  out.second = compute::MakeGeneratorReader(
-      out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); },
-      gc_memory_pool());
-  return out;
-}
+  void StopProducing() {
+    if (status_ == PLAN_RUNNING) {
+      std::shared_ptr<arrow::compute::ExecPlan> plan(plan_);
+      bool not_finished_yet = plan_->finished().TryAddCallback(
+          [&plan] { return [plan](const arrow::Status&) {}; });

Review Comment:
   This is copied from the original implementation ( 
https://github.com/apache/arrow/blob/master/r/src/compute-exec.cpp#L99-L109 ). 
I would absolutely love to make this simpler and can will a few things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to