westonpace commented on code in PR #14663:
URL: https://github.com/apache/arrow/pull/14663#discussion_r1032886054
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>>
DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration),
exec_context).result();
}
+namespace {
+struct BatchConverter {
+ explicit BatchConverter(::arrow::internal::Executor* executor)
+ : exec_context(std::make_shared<ExecContext>(default_memory_pool(),
executor)) {}
+
+ ~BatchConverter() {
+ if (!exec_plan) {
+ return;
+ }
+ if (exec_plan->finished().is_finished()) {
+ return;
+ }
+ exec_plan->StopProducing();
+ Status abandoned_status = exec_plan->finished().status();
+ if (!abandoned_status.ok()) {
+ abandoned_status.Warn();
+ }
+ }
+
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ return exec_batch_gen().Then(
+ [this](const std::optional<ExecBatch>& batch)
+ -> Future<std::shared_ptr<RecordBatch>> {
+ if (batch) {
+ return batch->ToRecordBatch(schema);
+ } else {
+ return exec_plan->finished().Then(
+ []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+ }
+ },
+ [this](const Status& err) {
+ return exec_plan->finished().Then(
+ [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+ });
+ }
+
+ // TODO: Should use exec context owned by the plan
Review Comment:
Ah, no it does not need to be addressed any longer. The plan now uses the
context of the batch converter so there is a single exec context, which was
what the todo comment was referring to.
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>>
DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration),
exec_context).result();
}
+namespace {
+struct BatchConverter {
+ explicit BatchConverter(::arrow::internal::Executor* executor)
+ : exec_context(std::make_shared<ExecContext>(default_memory_pool(),
executor)) {}
+
+ ~BatchConverter() {
+ if (!exec_plan) {
+ return;
+ }
+ if (exec_plan->finished().is_finished()) {
+ return;
+ }
+ exec_plan->StopProducing();
+ Status abandoned_status = exec_plan->finished().status();
+ if (!abandoned_status.ok()) {
+ abandoned_status.Warn();
+ }
+ }
+
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ return exec_batch_gen().Then(
+ [this](const std::optional<ExecBatch>& batch)
+ -> Future<std::shared_ptr<RecordBatch>> {
+ if (batch) {
+ return batch->ToRecordBatch(schema);
+ } else {
+ return exec_plan->finished().Then(
+ []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+ }
+ },
+ [this](const Status& err) {
+ return exec_plan->finished().Then(
+ [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+ });
+ }
+
+ // TODO: Should use exec context owned by the plan
+ std::shared_ptr<ExecContext> exec_context;
+ AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+ std::shared_ptr<Schema> schema;
+ std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>>
DeclarationToRecordBatchGenerator(
+ Declaration declaration, ::arrow::internal::Executor* executor,
+ std::shared_ptr<Schema>* out_schema) {
+ auto converter = std::make_shared<BatchConverter>(executor);
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+ ExecPlan::Make(converter->exec_context.get()));
+ Declaration with_sink = Declaration::Sequence(
+ {declaration,
+ {"sink", SinkNodeOptions(&converter->exec_batch_gen,
&converter->schema)}});
+ ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+ ARROW_RETURN_NOT_OK(plan->StartProducing());
+ converter->exec_plan = std::move(plan);
+ *out_schema = converter->schema;
+ return [conv = std::move(converter)] { return (*conv)(); };
+}
+} // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration
declaration,
+ bool
use_threads) {
+ std::shared_ptr<Schema> schema;
+ auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
Review Comment:
Added, thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]