lidavidm commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613549219



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -102,6 +102,79 @@ Result<std::shared_ptr<FileFragment>> 
FileFormat::MakeFragment(
                        std::move(partition_expression), 
std::move(physical_schema)));
 }
 
+// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
+// implementation of ScanBatchesAsync is both ugly and terribly ineffecient.  
Each of the
+// formats should provide their own efficient implementation.
+Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
+    const ScanOptions& options, const std::shared_ptr<FileFragment>& file) {
+  std::shared_ptr<ScanOptions> scan_options = 
std::make_shared<ScanOptions>(options);
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
+  struct State {
+    State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator 
scan_task_it)
+        : scan_options(std::move(scan_options)),
+          scan_task_it(std::move(scan_task_it)),
+          current_rb_it(),
+          current_rb_gen(),
+          finished(false) {}
+
+    std::shared_ptr<ScanOptions> scan_options;
+    ScanTaskIterator scan_task_it;
+    RecordBatchIterator current_rb_it;
+    RecordBatchGenerator current_rb_gen;
+    bool finished;
+  };
+  struct Generator {
+    Future<std::shared_ptr<RecordBatch>> operator()() {
+      if (state->finished) {
+        return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+      }
+      if (!state->current_rb_it && !state->current_rb_gen) {
+        RETURN_NOT_OK(PumpScanTask());
+        if (state->finished) {
+          return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+        }
+      }
+      if (state->current_rb_gen) {
+        return NextAsync();
+      }
+      return NextSync();
+    }
+    Future<std::shared_ptr<RecordBatch>> NextSync() {
+      ARROW_ASSIGN_OR_RAISE(auto next_sync, state->current_rb_it.Next());
+      if (IsIterationEnd(next_sync)) {

Review comment:
       I see that there's now a parameter to generate multiple scan tasks per 
fragment in InMemoryDataset - however, is that necessary? For one, it doesn't 
affect this code path, since this only affects file fragments. For another, it 
doesn't affect the scanner, which doesn't use scan tasks (directly); it'll use 
ScanBatchesAsync on the Fragment, which flattens all the scan tasks itself 
anyways.
   
   So I think the issue pointed out here doesn't show up in test purely because 
only Parquet fragments expose multiple scan tasks per fragment right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to