lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r664679484



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& 
options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, 
options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = 
batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 
1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, 
std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 
2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, 
cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       In other places we wrap the generator with a generator that holds a 
reference to whatever resources are necessary - why doesn't that work here?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship

Review comment:
       Is this worth filing a follow-up for? I'd guess a lot of scanner usage 
is already single-use.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -428,6 +428,11 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,
   Result<FragmentGenerator> GetFragments() const;
 
   std::shared_ptr<Dataset> dataset_;
+
+  // XXX if Scanner were truly single-use this would be a 1:1 relationship
+  std::unordered_map<std::shared_ptr<compute::ExecPlan>,
+                     std::shared_ptr<compute::ExecContext>>
+      plans_;

Review comment:
       This is a little questionable in that I don't think the lifetime of 
these generators was tied to the lifetime of the scanner before. Maybe we 
should also keep a reference to the scanner from the generators?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> 
FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, 
/*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), 
options->fragment_readahead);
+
+  auto merged_batch_gen = 
MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), 
options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, 
partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may 
be
+        // unnecessarily materialized columns in batch. We can drop them now 
instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches 
than this,
+        // for example parquet's row group stats. Failing to do this leaves 
perf on the
+        // table because row group stats could be used to skip kernel execs in 
FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       It is a little unfortunate that we're making several heap allocations 
for every batch to carry effectively three integers (though I guess the 
overhead is probably not noticeable in the grand scheme of things and batches 
should be relatively large).

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +654,93 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& 
options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, 
options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = 
batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 
1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, 
std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 
2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, 
cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
+
+  // Ensure plan, exec_context outlive usage of the returned generator
+  plans_.emplace(plan, std::move(exec_context));

Review comment:
       I guess - we might drop the outermost generator but not necessarily the 
whole graph of generators?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1254,8 +1261,32 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) {
               ResultWith(UnorderedElementsAreArray(expected)));
 }
 
-TEST(ScanNode, ProjectionPushdown) {
-  // ensure non-projected columns are dropped
+TEST(ScanNode, DISABLED_ProjectionPushdown) {

Review comment:
       Disabled because of that TODO above?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -525,6 +530,64 @@ Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> 
FragmentsToBatches(
                              });
 }
 
+Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
+                                        FragmentGenerator fragment_gen,
+                                        std::shared_ptr<ScanOptions> options) {
+  if (!options->use_async) {
+    return Status::NotImplemented("ScanNodes without asynchrony");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto batch_gen_gen,
+      FragmentsToBatches(std::move(fragment_gen), options, 
/*filter_and_project=*/false));
+
+  auto batch_gen_gen_readahead =
+      MakeSerialReadaheadGenerator(std::move(batch_gen_gen), 
options->fragment_readahead);
+
+  auto merged_batch_gen = 
MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+                                              options->fragment_readahead);
+
+  auto batch_gen =
+      MakeReadaheadGenerator(std::move(merged_batch_gen), 
options->fragment_readahead);
+
+  auto gen = MakeMappedGenerator(
+      std::move(batch_gen),
+      [options](const EnumeratedRecordBatch& partial)
+          -> Result<util::optional<compute::ExecBatch>> {
+        ARROW_ASSIGN_OR_RAISE(
+            util::optional<compute::ExecBatch> batch,
+            compute::MakeExecBatch(*options->dataset_schema, 
partial.record_batch.value));
+        // TODO if a fragment failed to perform projection pushdown, there may 
be
+        // unnecessarily materialized columns in batch. We can drop them now 
instead of
+        // letting them coast through the rest of the plan.
+
+        // TODO fragments may be able to attach more guarantees to batches 
than this,
+        // for example parquet's row group stats. Failing to do this leaves 
perf on the
+        // table because row group stats could be used to skip kernel execs in 
FilterNode
+        batch->guarantee = partial.fragment.value->partition_expression();
+
+        // tag rows with fragment- and batch-of-origin
+        batch->values.emplace_back(partial.fragment.index);
+        batch->values.emplace_back(partial.record_batch.index);
+        batch->values.emplace_back(partial.record_batch.last);

Review comment:
       I guess if it is ever noticeable overhead, we could make the booleans 
singletons and pool the integer allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to