rok commented on code in PR #14663:
URL: https://github.com/apache/arrow/pull/14663#discussion_r1028048227
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>>
DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration),
exec_context).result();
}
+namespace {
+struct BatchConverter {
+ explicit BatchConverter(::arrow::internal::Executor* executor)
+ : exec_context(std::make_shared<ExecContext>(default_memory_pool(),
executor)) {}
+
+ ~BatchConverter() {
+ if (!exec_plan) {
+ return;
+ }
+ if (exec_plan->finished().is_finished()) {
+ return;
+ }
+ exec_plan->StopProducing();
+ Status abandoned_status = exec_plan->finished().status();
+ if (!abandoned_status.ok()) {
+ abandoned_status.Warn();
+ }
+ }
+
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ return exec_batch_gen().Then(
+ [this](const std::optional<ExecBatch>& batch)
+ -> Future<std::shared_ptr<RecordBatch>> {
+ if (batch) {
+ return batch->ToRecordBatch(schema);
+ } else {
+ return exec_plan->finished().Then(
+ []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+ }
+ },
+ [this](const Status& err) {
+ return exec_plan->finished().Then(
+ [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+ });
+ }
+
+ // TODO: Should use exec context owned by the plan
+ std::shared_ptr<ExecContext> exec_context;
+ AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+ std::shared_ptr<Schema> schema;
+ std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>>
DeclarationToRecordBatchGenerator(
+ Declaration declaration, ::arrow::internal::Executor* executor,
+ std::shared_ptr<Schema>* out_schema) {
+ auto converter = std::make_shared<BatchConverter>(executor);
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+ ExecPlan::Make(converter->exec_context.get()));
+ Declaration with_sink = Declaration::Sequence(
+ {declaration,
+ {"sink", SinkNodeOptions(&converter->exec_batch_gen,
&converter->schema)}});
+ ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+ ARROW_RETURN_NOT_OK(plan->StartProducing());
+ converter->exec_plan = std::move(plan);
+ *out_schema = converter->schema;
+ return [conv = std::move(converter)] { return (*conv)(); };
+}
+} // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration
declaration,
+ bool
use_threads) {
+ std::shared_ptr<Schema> schema;
+ auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
Review Comment:
Nit:
```suggestion
auto batch_iterator =
std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
```
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>>
DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration),
exec_context).result();
}
+namespace {
+struct BatchConverter {
+ explicit BatchConverter(::arrow::internal::Executor* executor)
+ : exec_context(std::make_shared<ExecContext>(default_memory_pool(),
executor)) {}
+
+ ~BatchConverter() {
+ if (!exec_plan) {
+ return;
+ }
+ if (exec_plan->finished().is_finished()) {
+ return;
+ }
+ exec_plan->StopProducing();
+ Status abandoned_status = exec_plan->finished().status();
+ if (!abandoned_status.ok()) {
+ abandoned_status.Warn();
+ }
+ }
+
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ return exec_batch_gen().Then(
+ [this](const std::optional<ExecBatch>& batch)
+ -> Future<std::shared_ptr<RecordBatch>> {
+ if (batch) {
+ return batch->ToRecordBatch(schema);
+ } else {
+ return exec_plan->finished().Then(
+ []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+ }
+ },
+ [this](const Status& err) {
+ return exec_plan->finished().Then(
+ [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+ });
+ }
+
+ // TODO: Should use exec context owned by the plan
Review Comment:
Does this need to be addressed?
##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>>
DeclarationToExecBatches(Declaration declaration,
return DeclarationToExecBatchesAsync(std::move(declaration),
exec_context).result();
}
+namespace {
+struct BatchConverter {
+ explicit BatchConverter(::arrow::internal::Executor* executor)
+ : exec_context(std::make_shared<ExecContext>(default_memory_pool(),
executor)) {}
+
+ ~BatchConverter() {
+ if (!exec_plan) {
+ return;
+ }
+ if (exec_plan->finished().is_finished()) {
+ return;
+ }
+ exec_plan->StopProducing();
+ Status abandoned_status = exec_plan->finished().status();
+ if (!abandoned_status.ok()) {
+ abandoned_status.Warn();
+ }
+ }
+
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ return exec_batch_gen().Then(
+ [this](const std::optional<ExecBatch>& batch)
+ -> Future<std::shared_ptr<RecordBatch>> {
+ if (batch) {
+ return batch->ToRecordBatch(schema);
+ } else {
+ return exec_plan->finished().Then(
+ []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+ }
+ },
+ [this](const Status& err) {
+ return exec_plan->finished().Then(
+ [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+ });
+ }
+
+ // TODO: Should use exec context owned by the plan
+ std::shared_ptr<ExecContext> exec_context;
+ AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+ std::shared_ptr<Schema> schema;
+ std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>>
DeclarationToRecordBatchGenerator(
+ Declaration declaration, ::arrow::internal::Executor* executor,
+ std::shared_ptr<Schema>* out_schema) {
+ auto converter = std::make_shared<BatchConverter>(executor);
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+ ExecPlan::Make(converter->exec_context.get()));
+ Declaration with_sink = Declaration::Sequence(
+ {declaration,
+ {"sink", SinkNodeOptions(&converter->exec_batch_gen,
&converter->schema)}});
+ ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+ ARROW_RETURN_NOT_OK(plan->StartProducing());
+ converter->exec_plan = std::move(plan);
+ *out_schema = converter->schema;
+ return [conv = std::move(converter)] { return (*conv)(); };
+}
+} // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration
declaration,
+ bool
use_threads) {
+ std::shared_ptr<Schema> schema;
+ auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
+ ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
+ [&](::arrow::internal::Executor* executor)
+ -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
+ return DeclarationToRecordBatchGenerator(declaration, executor,
&schema);
+ },
+ use_threads));
+
+ struct PlanReader : RecordBatchReader {
+ PlanReader(std::shared_ptr<Schema> schema,
+ std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>>
iterator)
+ : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+ DCHECK(!!iterator_) << "call to ReadNext on already closed reader";
+ return iterator_->Next().Value(record_batch);
+ }
+
+ Status Close() override {
+ // End plan and read from generator until finished
+ std::shared_ptr<RecordBatch> batch;
+ do {
+ ARROW_RETURN_NOT_OK(ReadNext(&batch));
+ } while (batch != nullptr);
+ iterator_.reset();
+ return Status::OK();
+ }
+
+ std::shared_ptr<Schema> schema_;
+ std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
+ };
+
+ return std::make_unique<PlanReader>(std::move(schema), std::move(batch_itr));
Review Comment:
Nit:
```suggestion
return std::make_unique<PlanReader>(std::move(schema),
std::move(batch_iterator));
```
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -108,6 +131,18 @@ Future<std::optional<int64_t>> FileFormat::CountRows(
return Future<std::optional<int64_t>>::MakeFinished(std::nullopt);
}
+Future<std::shared_ptr<InspectedFragment>> FileFormat::InspectFragment(
+ const FileSource& source, const FragmentScanOptions* format_options,
+ compute::ExecContext* exec_context) const {
+ return Status::NotImplemented("The new scanner is not yet supported on this
format");
Review Comment:
Which scanner?
##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -52,9 +53,99 @@ using internal::SerialExecutor;
namespace dataset {
+struct CsvInspectedFragment : public InspectedFragment {
+ CsvInspectedFragment(std::vector<std::string> column_names,
+ std::shared_ptr<io::InputStream> input_stream, int64_t
num_bytes)
+ : InspectedFragment(std::move(column_names)),
+ input_stream(std::move(input_stream)),
+ num_bytes(num_bytes) {}
+ // We need to start reading the file in order to figure out the column names
and
+ // so we save off the input stream
+ std::shared_ptr<io::InputStream> input_stream;
+ int64_t num_bytes;
+};
+
+class CsvFileScanner : public FragmentScanner {
+ public:
+ CsvFileScanner(std::shared_ptr<csv::StreamingReader> reader, int num_batches,
+ int64_t best_guess_bytes_per_batch)
+ : reader_(std::move(reader)),
+ num_batches_(num_batches),
+ best_guess_bytes_per_batch_(best_guess_bytes_per_batch) {}
+
+ Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) override {
+ // We should be called in increasing order but let's verify that in case
it changes.
Review Comment:
Nit: do you mean:
```suggestion
// This should be called in increasing order but let's verify that in
case it changes.
```
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -108,6 +131,18 @@ Future<std::optional<int64_t>> FileFormat::CountRows(
return Future<std::optional<int64_t>>::MakeFinished(std::nullopt);
}
+Future<std::shared_ptr<InspectedFragment>> FileFormat::InspectFragment(
+ const FileSource& source, const FragmentScanOptions* format_options,
+ compute::ExecContext* exec_context) const {
+ return Status::NotImplemented("The new scanner is not yet supported on this
format");
+}
+
+Future<std::shared_ptr<FragmentScanner>> FileFormat::BeginScan(
+ const FragmentScanRequest& request, const InspectedFragment&
inspected_fragment,
+ const FragmentScanOptions* format_options, compute::ExecContext*
exec_context) const {
+ return Status::NotImplemented("The new scanner is not yet supported on this
format");
Review Comment:
As above, which scanner?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]