bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664746139
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>>
FragmentsToBatches(
});
}
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+ FragmentGenerator fragment_gen,
+ std::shared_ptr<ScanOptions> options) {
+ if (!options->use_async) {
+ return Status::NotImplemented("ScanNodes without asynchrony");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch_gen_gen,
+ FragmentsToBatches(std::move(fragment_gen), options,
/*filter_and_project=*/false));
+
+ auto batch_gen_gen_readahead =
+ MakeSerialReadaheadGenerator(std::move(batch_gen_gen),
options->fragment_readahead);
+
+ auto merged_batch_gen =
MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+ options->fragment_readahead);
+
+ auto batch_gen =
+ MakeReadaheadGenerator(std::move(merged_batch_gen),
options->fragment_readahead);
+
+ auto gen = MakeMappedGenerator(
+ std::move(batch_gen),
+ [options](const EnumeratedRecordBatch& partial)
+ -> Result<util::optional<compute::ExecBatch>> {
+ ARROW_ASSIGN_OR_RAISE(
+ util::optional<compute::ExecBatch> batch,
+ compute::MakeExecBatch(*options->dataset_schema,
partial.record_batch.value));
+ // TODO if a fragment failed to perform projection pushdown, there may
be
+ // unnecessarily materialized columns in batch. We can drop them now
instead of
+ // letting them coast through the rest of the plan.
+
+ // TODO fragments may be able to attach more guarantees to batches
than this,
+ // for example parquet's row group stats. Failing to do this leaves
perf on the
+ // table because row group stats could be used to skip kernel execs in
FilterNode
+ batch->guarantee = partial.fragment.value->partition_expression();
+
+ // tag rows with fragment- and batch-of-origin
+ batch->values.emplace_back(partial.fragment.index);
+ batch->values.emplace_back(partial.record_batch.index);
+ batch->values.emplace_back(partial.record_batch.last);
Review comment:
See also https://issues.apache.org/jira/browse/ARROW-8928
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]