westonpace commented on code in PR #14663: URL: https://github.com/apache/arrow/pull/14663#discussion_r1032886054
########## cpp/src/arrow/compute/exec/exec_plan.cc: ########## @@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration, return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result(); } +namespace { +struct BatchConverter { + explicit BatchConverter(::arrow::internal::Executor* executor) + : exec_context(std::make_shared<ExecContext>(default_memory_pool(), executor)) {} + + ~BatchConverter() { + if (!exec_plan) { + return; + } + if (exec_plan->finished().is_finished()) { + return; + } + exec_plan->StopProducing(); + Status abandoned_status = exec_plan->finished().status(); + if (!abandoned_status.ok()) { + abandoned_status.Warn(); + } + } + + Future<std::shared_ptr<RecordBatch>> operator()() { + return exec_batch_gen().Then( + [this](const std::optional<ExecBatch>& batch) + -> Future<std::shared_ptr<RecordBatch>> { + if (batch) { + return batch->ToRecordBatch(schema); + } else { + return exec_plan->finished().Then( + []() -> std::shared_ptr<RecordBatch> { return nullptr; }); + } + }, + [this](const Status& err) { + return exec_plan->finished().Then( + [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; }); + }); + } + + // TODO: Should use exec context owned by the plan Review Comment: Ah, no it does not need to be addressed any longer. The plan now uses the context of the batch converter so there is a single exec context, which was what the todo comment was referring to. ########## cpp/src/arrow/compute/exec/exec_plan.cc: ########## @@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration, return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result(); } +namespace { +struct BatchConverter { + explicit BatchConverter(::arrow::internal::Executor* executor) + : exec_context(std::make_shared<ExecContext>(default_memory_pool(), executor)) {} + + ~BatchConverter() { + if (!exec_plan) { + return; + } + if (exec_plan->finished().is_finished()) { + return; + } + exec_plan->StopProducing(); + Status abandoned_status = exec_plan->finished().status(); + if (!abandoned_status.ok()) { + abandoned_status.Warn(); + } + } + + Future<std::shared_ptr<RecordBatch>> operator()() { + return exec_batch_gen().Then( + [this](const std::optional<ExecBatch>& batch) + -> Future<std::shared_ptr<RecordBatch>> { + if (batch) { + return batch->ToRecordBatch(schema); + } else { + return exec_plan->finished().Then( + []() -> std::shared_ptr<RecordBatch> { return nullptr; }); + } + }, + [this](const Status& err) { + return exec_plan->finished().Then( + [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; }); + }); + } + + // TODO: Should use exec context owned by the plan + std::shared_ptr<ExecContext> exec_context; + AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen; + std::shared_ptr<Schema> schema; + std::shared_ptr<ExecPlan> exec_plan; +}; + +Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator( + Declaration declaration, ::arrow::internal::Executor* executor, + std::shared_ptr<Schema>* out_schema) { + auto converter = std::make_shared<BatchConverter>(executor); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, + ExecPlan::Make(converter->exec_context.get())); + Declaration with_sink = Declaration::Sequence( + {declaration, + {"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}}); + ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get())); + ARROW_RETURN_NOT_OK(plan->StartProducing()); + converter->exec_plan = std::move(plan); + *out_schema = converter->schema; + return [conv = std::move(converter)] { return (*conv)(); }; +} +} // namespace + +Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration, + bool use_threads) { + std::shared_ptr<Schema> schema; + auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>( Review Comment: Added, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org