westonpace commented on a change in pull request #10008:
URL: https://github.com/apache/arrow/pull/10008#discussion_r613415855



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +365,228 @@ Future<std::shared_ptr<Table>> 
SyncScanner::ToTableInternal(
           });
 }
 
+namespace {
+
+inline Result<EnumeratedRecordBatch> DoFilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, const EnumeratedRecordBatch& in) {
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
+                        SimplifyWithGuarantee(scanner->options()->filter,
+                                              
in.fragment.value->partition_expression()));
+
+  compute::ExecContext exec_context{scanner->options()->pool};
+  ARROW_ASSIGN_OR_RAISE(
+      Datum mask, ExecuteScalarExpression(simplified_filter, 
Datum(in.record_batch.value),
+                                          &exec_context));
+
+  Datum filtered;
+  if (mask.is_scalar()) {
+    const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
+    if (mask_scalar.is_valid && mask_scalar.value) {
+      // filter matches entire table
+      filtered = in.record_batch.value;
+    } else {
+      // Filter matches nothing
+      filtered = in.record_batch.value->Slice(0, 0);
+    }
+  } else {
+    ARROW_ASSIGN_OR_RAISE(
+        filtered, compute::Filter(in.record_batch.value, mask,
+                                  compute::FilterOptions::Defaults(), 
&exec_context));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(Expression simplified_projection,
+                        SimplifyWithGuarantee(scanner->options()->projection,
+                                              
in.fragment.value->partition_expression()));
+  ARROW_ASSIGN_OR_RAISE(
+      Datum projected,
+      ExecuteScalarExpression(simplified_projection, filtered, &exec_context));
+
+  DCHECK_EQ(projected.type()->id(), Type::STRUCT);
+  if (projected.shape() == ValueDescr::SCALAR) {
+    // Only virtual columns are projected. Broadcast to an array
+    ARROW_ASSIGN_OR_RAISE(
+        projected,
+        MakeArrayFromScalar(*projected.scalar(), 
filtered.record_batch()->num_rows(),
+                            scanner->options()->pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto out,
+                        
RecordBatch::FromStructArray(projected.array_as<StructArray>()));
+  auto projected_batch =
+      out->ReplaceSchemaMetadata(in.record_batch.value->schema()->metadata());
+
+  return EnumeratedRecordBatch{
+      {std::move(projected_batch), in.record_batch.index, 
in.record_batch.last},
+      in.fragment};
+}
+
+inline EnumeratedRecordBatchGenerator FilterAndProjectRecordBatchAsync(
+    const std::shared_ptr<Scanner>& scanner, EnumeratedRecordBatchGenerator 
rbs) {
+  auto mapper = [scanner](const EnumeratedRecordBatch& in) {
+    return DoFilterAndProjectRecordBatchAsync(scanner, in);
+  };
+  return MakeMappedGenerator<EnumeratedRecordBatch>(std::move(rbs), mapper);
+}
+
+Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
+    std::shared_ptr<AsyncScanner> scanner,
+    const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen,
+                        fragment.value->ScanBatchesAsync(*scanner->options()));
+  auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen));
+
+  auto combine_fn =
+      [fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) 
{
+        return EnumeratedRecordBatch{record_batch, fragment};
+      };
+
+  auto combined_gen = 
MakeMappedGenerator<EnumeratedRecordBatch>(enumerated_batch_gen,
+                                                                 
std::move(combine_fn));
+
+  return FilterAndProjectRecordBatchAsync(scanner, std::move(combined_gen));
+}
+
+Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
+    std::shared_ptr<AsyncScanner> scanner, FragmentGenerator fragment_gen) {
+  auto enumerated_fragment_gen = 
MakeEnumeratedGenerator(std::move(fragment_gen));
+  return MakeMappedGenerator<EnumeratedRecordBatchGenerator>(
+      std::move(enumerated_fragment_gen),
+      [scanner](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
+        return FragmentToBatches(scanner, fragment);
+      });
+}
+
+}  // namespace
+
+Result<FragmentGenerator> AsyncScanner::GetFragments() const {
+  // TODO(ARROW-8163): Async fragment scanning will return 
AsyncGenerator<Fragment> here.
+  // Current iterator based versions are all fast & sync so we will just 
ToVector it
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, 
dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector());
+  return MakeVectorGenerator(std::move(fragments_vec));
+}
+
+Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen, 
ScanBatchesAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
+  ARROW_ASSIGN_OR_RAISE(auto batches_gen,
+                        
ScanBatchesUnorderedAsync(scan_options_->cpu_executor));
+  return MakeGeneratorIterator(std::move(batches_gen));
+}
+
+Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
+  auto table_fut = ToTableAsync(scan_options_->cpu_executor);
+  return table_fut.result();
+}
+
+Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
+    internal::Executor* cpu_executor) {
+  auto self = shared_from_this();
+  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
+                        FragmentsToBatches(self, std::move(fragment_gen)));
+  return MakeConcatenatedGenerator(std::move(batch_gen_gen));

Review comment:
       It will need to be.  The problem is that `MakeMergedGenerator` is 
immediately consuming `EnumeratingGenerator` which is not async-reentrant.  
`MakeMergedGenerator` (erroneously) pulls from the outer (the gen_gen) 
generator in an async-reentrant fashion.  I'll make a follow-up JIRA just to 
keep this one simple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to