westonpace commented on a change in pull request #10008: URL: https://github.com/apache/arrow/pull/10008#discussion_r615107215
########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } +// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following +// implementation of ScanBatchesAsync is both ugly and terribly ineffecient. Each of the +// formats should provide their own efficient implementation. +Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync( + const ScanOptions& options, const std::shared_ptr<FileFragment>& file) { + std::shared_ptr<ScanOptions> scan_options = std::make_shared<ScanOptions>(options); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); + struct State { + State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it) + : scan_options(std::move(scan_options)), + scan_task_it(std::move(scan_task_it)), + current_rb_it(), + current_rb_gen(), + finished(false) {} + + std::shared_ptr<ScanOptions> scan_options; + ScanTaskIterator scan_task_it; + RecordBatchIterator current_rb_it; + RecordBatchGenerator current_rb_gen; + bool finished; + }; + struct Generator { + Future<std::shared_ptr<RecordBatch>> operator()() { + if (state->finished) { + return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>(); + } + if (!state->current_rb_it && !state->current_rb_gen) { + RETURN_NOT_OK(PumpScanTask()); + if (state->finished) { + return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>(); + } + } + if (state->current_rb_gen) { + return NextAsync(); + } + return NextSync(); + } + Future<std::shared_ptr<RecordBatch>> NextSync() { + ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next()); + if (IsIterationEnd(next_sync)) { Review comment: I capitulated and removed the argument. Your comment about it just being a testing parameter is accurate. I created a test in `arrow-dataset-file-test` that does not rely on `InMemoryDataset` to test this logic here. I might in the future add some tests to `ScannerTest` that set a limit on scan options batch size to get coverage of the multiple batches per fragment case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org