bkietz commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r666202502
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator>
AsyncScanner::ScanBatchesUnorderedAsync()
return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
}
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+ const util::optional<compute::ExecBatch>& batch, const ScanOptions&
options,
+ const FragmentVector& fragments) {
+ int num_fields = options.projected_schema->num_fields();
+
+ ArrayVector columns(num_fields);
+ for (size_t i = 0; i < columns.size(); ++i) {
+ const Datum& value = batch->values[i];
+ if (value.is_array()) {
+ columns[i] = value.make_array();
+ continue;
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ columns[i], MakeArrayFromScalar(*value.scalar(), batch->length,
options.pool));
+ }
+
+ EnumeratedRecordBatch out;
+ out.fragment.index =
batch->values[num_fields].scalar_as<Int32Scalar>().value;
+ out.fragment.value = fragments[out.fragment.index];
+ out.fragment.last = false; // ignored during reordering
+
+ out.record_batch.index = batch->values[num_fields +
1].scalar_as<Int32Scalar>().value;
+ out.record_batch.value =
+ RecordBatch::Make(options.projected_schema, batch->length,
std::move(columns));
+ out.record_batch.last = batch->values[num_fields +
2].scalar_as<BooleanScalar>().value;
+
+ return out;
+}
+} // namespace
+
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
internal::Executor* cpu_executor) {
- ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
- return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
- cpu_executor);
+ if (!scan_options_->use_threads) {
+ cpu_executor = nullptr;
+ }
+
+ auto exec_context =
+ std::make_shared<compute::ExecContext>(scan_options_->pool,
cpu_executor);
+
+ ARROW_ASSIGN_OR_RAISE(auto plan,
compute::ExecPlan::Make(exec_context.get()));
+
+ ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_,
scan_options_));
+
+ ARROW_ASSIGN_OR_RAISE(auto filter,
+ compute::MakeFilterNode(scan, "filter",
scan_options_->filter));
+
+ auto exprs = scan_options_->projection.call()->arguments;
+ exprs.push_back(compute::field_ref("__fragment_index"));
+ exprs.push_back(compute::field_ref("__batch_index"));
+ exprs.push_back(compute::field_ref("__last_in_fragment"));
+ ARROW_ASSIGN_OR_RAISE(auto project,
+ compute::MakeProjectNode(filter, "project",
std::move(exprs)));
+
+ AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+ compute::MakeSinkNode(project, "sink");
+
+ RETURN_NOT_OK(plan->StartProducing());
+
+ auto options = scan_options_;
+ ARROW_ASSIGN_OR_RAISE(auto fragments_it,
dataset_->GetFragments(scan_options_->filter));
+ ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+ auto shared_fragments =
std::make_shared<FragmentVector>(std::move(fragments));
+
+ // If the generator is destroyed before being completely drained, inform plan
+ std::shared_ptr<void> stop_producing{
+ nullptr, [plan, exec_context](...) {
+ bool not_finished_yet = plan->finished().TryAddCallback([&] {
Review comment:
StopProducing is idempotent, and the goal here is only to save
unnecessary calls/keepalives if the plan has already finished
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]