westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613414017
##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>>
FileFormat::MakeFragment(
std::move(partition_expression),
std::move(physical_schema)));
}
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.
Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+ const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+ std::shared_ptr<ScanOptions> scan_options =
std::make_shared<ScanOptions>(options);
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+ struct State {
+ State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator
scan_task_it)
+ : scan_options(std::move(scan_options)),
+ scan_task_it(std::move(scan_task_it)),
+ current_rb_it(),
+ current_rb_gen(),
+ finished(false) {}
+
+ std::shared_ptr<ScanOptions> scan_options;
+ ScanTaskIterator scan_task_it;
+ RecordBatchIterator current_rb_it;
+ RecordBatchGenerator current_rb_gen;
+ bool finished;
+ };
+ struct Generator {
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ if (state->finished) {
+ return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+ }
+ if (!state->current_rb_it && !state->current_rb_gen) {
+ RETURN_NOT_OK(PumpScanTask());
+ if (state->finished) {
+ return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+ }
+ }
+ if (state->current_rb_gen) {
+ return NextAsync();
+ }
+ return NextSync();
+ }
+ Future<std::shared_ptr<RecordBatch>> NextSync() {
+ ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+ if (IsIterationEnd(next_sync)) {
Review comment:
There is a silent precondition here that every fragment scan should
return scan tasks that return at least 1 record batch (unless the entire
fragment is empty in which case either 0 scan tasks or 1 scan task with 0
batches should both be ok).
I'm know this precondition holds for IPC and CSV (by virtue of there being
only one scan task) but wasn't sure about parquet (i.e. can a push down filter
cause a batch-less scan task to be emitted in the middle of a set of scan
tasks?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]