lidavidm commented on a change in pull request #10008: URL: https://github.com/apache/arrow/pull/10008#discussion_r613321431
########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -364,5 +365,228 @@ Future<std::shared_ptr<Table>> SyncScanner::ToTableInternal( }); } +namespace { + +inline Result<EnumeratedRecordBatch> DoFilterAndProjectRecordBatchAsync( + const std::shared_ptr<Scanner>& scanner, const EnumeratedRecordBatch& in) { + ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + SimplifyWithGuarantee(scanner->options()->filter, + in.fragment.value->partition_expression())); + + compute::ExecContext exec_context{scanner->options()->pool}; + ARROW_ASSIGN_OR_RAISE( + Datum mask, ExecuteScalarExpression(simplified_filter, Datum(in.record_batch.value), + &exec_context)); + + Datum filtered; + if (mask.is_scalar()) { + const auto& mask_scalar = mask.scalar_as<BooleanScalar>(); + if (mask_scalar.is_valid && mask_scalar.value) { + // filter matches entire table + filtered = in.record_batch.value; + } else { + // Filter matches nothing + filtered = in.record_batch.value->Slice(0, 0); + } + } else { + ARROW_ASSIGN_OR_RAISE( + filtered, compute::Filter(in.record_batch.value, mask, + compute::FilterOptions::Defaults(), &exec_context)); + } + + ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + SimplifyWithGuarantee(scanner->options()->projection, + in.fragment.value->partition_expression())); + ARROW_ASSIGN_OR_RAISE( + Datum projected, + ExecuteScalarExpression(simplified_projection, filtered, &exec_context)); + + DCHECK_EQ(projected.type()->id(), Type::STRUCT); + if (projected.shape() == ValueDescr::SCALAR) { + // Only virtual columns are projected. Broadcast to an array + ARROW_ASSIGN_OR_RAISE( + projected, + MakeArrayFromScalar(*projected.scalar(), filtered.record_batch()->num_rows(), + scanner->options()->pool)); + } + ARROW_ASSIGN_OR_RAISE(auto out, + RecordBatch::FromStructArray(projected.array_as<StructArray>())); + auto projected_batch = + out->ReplaceSchemaMetadata(in.record_batch.value->schema()->metadata()); + + return EnumeratedRecordBatch{ + {std::move(projected_batch), in.record_batch.index, in.record_batch.last}, + in.fragment}; +} + +inline EnumeratedRecordBatchGenerator FilterAndProjectRecordBatchAsync( + const std::shared_ptr<Scanner>& scanner, EnumeratedRecordBatchGenerator rbs) { + auto mapper = [scanner](const EnumeratedRecordBatch& in) { + return DoFilterAndProjectRecordBatchAsync(scanner, in); + }; + return MakeMappedGenerator<EnumeratedRecordBatch>(std::move(rbs), mapper); +} + +Result<EnumeratedRecordBatchGenerator> FragmentToBatches( + std::shared_ptr<AsyncScanner> scanner, + const Enumerated<std::shared_ptr<Fragment>>& fragment) { + ARROW_ASSIGN_OR_RAISE(auto batch_gen, + fragment.value->ScanBatchesAsync(*scanner->options())); + auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen)); + + auto combine_fn = + [fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) { + return EnumeratedRecordBatch{record_batch, fragment}; + }; + + auto combined_gen = MakeMappedGenerator<EnumeratedRecordBatch>(enumerated_batch_gen, + std::move(combine_fn)); + + return FilterAndProjectRecordBatchAsync(scanner, std::move(combined_gen)); +} + +Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches( + std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) { + auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen)); + return MakeMappedGenerator<EnumeratedRecordBatchGenerator>( + std::move(enumerated_fragment_gen), + [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) { + return FragmentToBatches(scanner, fragment); + }); +} + +} // namespace + +Result<FragmentGenerator> AsyncScanner::GetFragments() const { + // TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment> here. + // Current iterator based versions are all fast & sync so we will just ToVector it + ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter)); + ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector()); + return MakeVectorGenerator(std::move(fragments_vec)); +} + +Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() { + ARROW_ASSIGN_OR_RAISE(auto batches_gen, ScanBatchesAsync(scan_options_->cpu_executor)); + return MakeGeneratorIterator(std::move(batches_gen)); +} + +Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() { + ARROW_ASSIGN_OR_RAISE(auto batches_gen, + ScanBatchesUnorderedAsync(scan_options_->cpu_executor)); + return MakeGeneratorIterator(std::move(batches_gen)); +} + +Result<std::shared_ptr<Table>> AsyncScanner::ToTable() { + auto table_fut = ToTableAsync(scan_options_->cpu_executor); + return table_fut.result(); +} + +Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync( + internal::Executor* cpu_executor) { + auto self = shared_from_this(); + ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments()); + ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen, + FragmentsToBatches(self, std::move(fragment_gen))); + return MakeConcatenatedGenerator(std::move(batch_gen_gen)); Review comment: Could this be MakeMergedGenerator? ########## File path: cpp/src/arrow/dataset/dataset.cc ########## @@ -95,6 +95,55 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt return MakeMapIterator(fn, std::move(batches_it)); } +Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync( + const ScanOptions& options) { + struct State { + State(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size) + : fragment(std::move(fragment)), + batch_index(0), + offset(0), + batch_size(batch_size) {} + + std::shared_ptr<RecordBatch> Next() { + const auto& next_parent = fragment->record_batches_[batch_index]; + if (offset < next_parent->num_rows()) { + auto next = next_parent->Slice(offset, batch_size); + offset += batch_size; + return next; + } + batch_index++; + offset = 0; + return nullptr; + } + + bool Finished() { return batch_index >= fragment->record_batches_.size(); } + + std::shared_ptr<InMemoryFragment> fragment; + std::size_t batch_index; + int64_t offset; + int64_t batch_size; + }; + + struct Generator { + Generator(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size) + : state(std::make_shared<State>(std::move(fragment), batch_size)) {} + + Future<std::shared_ptr<RecordBatch>> operator()() { + while (!state->Finished()) { + auto next = state->Next(); + if (next) { + return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next)); + } + } + return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>(); + } + + std::shared_ptr<State> state; + }; + return Generator(std::dynamic_pointer_cast<InMemoryFragment>(shared_from_this()), Review comment: nit: maybe `internal::checked_pointer_cast`? (though admittedly it's not used super consistently throughout the codebase) ########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } +// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following +// implementation of ScanBatchesAsync is both ugly and terribly ineffecient. Each of the +// formats should provide their own efficient implementation. +Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync( + const ScanOptions& options, const std::shared_ptr<FileFragment>& file) { + std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); + struct State { + State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it) + : scan_options(std::move(scan_options)), + scan_task_it(std::move(scan_task_it)), + current_rb_it(), + current_rb_gen(), + finished(false) {} + + std::shared_ptr<ScanOptions> scan_options; + ScanTaskIterator scan_task_it; + RecordBatchIterator current_rb_it; + RecordBatchGenerator current_rb_gen; + bool finished; + }; + struct Generator { + Future<std::shared_ptr<RecordBatch>> operator()() { + if (state->finished) { + return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>(); + } + if (!state->current_rb_it && !state->current_rb_gen) { + RETURN_NOT_OK(PumpScanTask()); + if (state->finished) { + return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>(); + } + } + if (state->current_rb_gen) { + return NextAsync(); + } + return NextSync(); + } + Future<std::shared_ptr<RecordBatch>> NextSync() { + ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next()); + if (IsIterationEnd(next_sync)) { Review comment: Don't we need to check again if NextSync/NextAsync return the end marker? Otherwise, operator() will return a Future that resolves to the end marker and the consumer will stop early. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org