westonpace commented on code in PR #14663:
URL: https://github.com/apache/arrow/pull/14663#discussion_r1032886099


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> 
DeclarationToExecBatches(Declaration declaration,
   return DeclarationToExecBatchesAsync(std::move(declaration), 
exec_context).result();
 }
 
+namespace {
+struct BatchConverter {
+  explicit BatchConverter(::arrow::internal::Executor* executor)
+      : exec_context(std::make_shared<ExecContext>(default_memory_pool(), 
executor)) {}
+
+  ~BatchConverter() {
+    if (!exec_plan) {
+      return;
+    }
+    if (exec_plan->finished().is_finished()) {
+      return;
+    }
+    exec_plan->StopProducing();
+    Status abandoned_status = exec_plan->finished().status();
+    if (!abandoned_status.ok()) {
+      abandoned_status.Warn();
+    }
+  }
+
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    return exec_batch_gen().Then(
+        [this](const std::optional<ExecBatch>& batch)
+            -> Future<std::shared_ptr<RecordBatch>> {
+          if (batch) {
+            return batch->ToRecordBatch(schema);
+          } else {
+            return exec_plan->finished().Then(
+                []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+          }
+        },
+        [this](const Status& err) {
+          return exec_plan->finished().Then(
+              [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+        });
+  }
+
+  // TODO: Should use exec context owned by the plan
+  std::shared_ptr<ExecContext> exec_context;
+  AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+  std::shared_ptr<Schema> schema;
+  std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> 
DeclarationToRecordBatchGenerator(
+    Declaration declaration, ::arrow::internal::Executor* executor,
+    std::shared_ptr<Schema>* out_schema) {
+  auto converter = std::make_shared<BatchConverter>(executor);
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+                        ExecPlan::Make(converter->exec_context.get()));
+  Declaration with_sink = Declaration::Sequence(
+      {declaration,
+       {"sink", SinkNodeOptions(&converter->exec_batch_gen, 
&converter->schema)}});
+  ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+  converter->exec_plan = std::move(plan);
+  *out_schema = converter->schema;
+  return [conv = std::move(converter)] { return (*conv)(); };
+}
+}  // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration 
declaration,
+                                                               bool 
use_threads) {
+  std::shared_ptr<Schema> schema;
+  auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
+      ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
+          [&](::arrow::internal::Executor* executor)
+              -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
+            return DeclarationToRecordBatchGenerator(declaration, executor, 
&schema);
+          },
+          use_threads));
+
+  struct PlanReader : RecordBatchReader {
+    PlanReader(std::shared_ptr<Schema> schema,
+               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> 
iterator)
+        : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+
+    std::shared_ptr<Schema> schema() const override { return schema_; }
+
+    Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+      DCHECK(!!iterator_) << "call to ReadNext on already closed reader";
+      return iterator_->Next().Value(record_batch);
+    }
+
+    Status Close() override {
+      // End plan and read from generator until finished
+      std::shared_ptr<RecordBatch> batch;
+      do {
+        ARROW_RETURN_NOT_OK(ReadNext(&batch));
+      } while (batch != nullptr);
+      iterator_.reset();
+      return Status::OK();
+    }
+
+    std::shared_ptr<Schema> schema_;
+    std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
+  };
+
+  return std::make_unique<PlanReader>(std::move(schema), std::move(batch_itr));

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to