westonpace commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r612679288



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +478,80 @@ Future<std::shared_ptr<Table>> 
SyncScanner::ToTableInternal(
           });
 }
 
+Result<std::shared_ptr<Table>> Scanner::TakeRows(const Array& indices) {
+  if (indices.null_count() != 0) {
+    return Status::NotImplemented("null take indices");
+  }
+
+  compute::ExecContext ctx(scan_options_->pool);
+
+  const Array* original_indices;
+  // If we have to cast, this is the backing reference
+  std::shared_ptr<Array> original_indices_ptr;
+  if (indices.type_id() != Type::INT64) {
+    ARROW_ASSIGN_OR_RAISE(
+        original_indices_ptr,
+        compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx));
+    original_indices = original_indices_ptr.get();
+  } else {
+    original_indices = &indices;
+  }
+
+  std::shared_ptr<Array> unsort_indices;
+  {
+    ARROW_ASSIGN_OR_RAISE(
+        auto sort_indices,
+        compute::SortIndices(*original_indices, compute::SortOrder::Ascending, 
&ctx));
+    ARROW_ASSIGN_OR_RAISE(original_indices_ptr,
+                          compute::Take(*original_indices, *sort_indices,
+                                        compute::TakeOptions::Defaults(), 
&ctx));
+    original_indices = original_indices_ptr.get();
+    ARROW_ASSIGN_OR_RAISE(
+        unsort_indices,
+        compute::SortIndices(*sort_indices, compute::SortOrder::Ascending, 
&ctx));
+  }
+
+  RecordBatchVector out_batches;
+
+  auto raw_indices = static_cast<const 
Int64Array&>(*original_indices).raw_values();
+  int64_t offset = 0, row_begin = 0;
+
+  ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches());

Review comment:
       Is there a JIRA for pushing down the index predicate into the scan?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -182,7 +273,30 @@ Result<FragmentIterator> SyncScanner::GetFragments() {
   return GetFragmentsFromDatasets({dataset_}, scan_options_->filter);
 }
 
-Result<ScanTaskIterator> SyncScanner::Scan() {
+Result<ScanTaskIterator> SyncScanner::Scan() { return ScanInternal(); }
+
+Status SyncScanner::Scan(std::function<Status(TaggedRecordBatch)> visitor) {
+  ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanInternal());
+
+  auto task_group = scan_options_->TaskGroup();
+
+  for (auto maybe_scan_task : scan_task_it) {
+    ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
+    task_group->Append([scan_task, visitor] {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());

Review comment:
       I think with CSV this could lead to deadlock 
(https://github.com/apache/arrow/pull/9947#discussion_r611937558) since 
`Execute` (as opposed to `ExecuteAsync`) will call `MakeGeneratorIterator` 
which blocks on the next item (thus nested parallelism).  You could maybe come 
up with some way to work around it but at this point I feel like it would be 
much cleaner to revert out the async streaming CSV reader.  The patch added in 
ARROW-12161 isn't really compatible with where the new AsyncScanner ended up 
going either.  I can add it back in with ARROW-12355 as a mirror API instead of 
replacing the existing.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +478,80 @@ Future<std::shared_ptr<Table>> 
SyncScanner::ToTableInternal(
           });
 }
 
+Result<std::shared_ptr<Table>> Scanner::TakeRows(const Array& indices) {
+  if (indices.null_count() != 0) {
+    return Status::NotImplemented("null take indices");
+  }
+
+  compute::ExecContext ctx(scan_options_->pool);
+
+  const Array* original_indices;
+  // If we have to cast, this is the backing reference
+  std::shared_ptr<Array> original_indices_ptr;
+  if (indices.type_id() != Type::INT64) {
+    ARROW_ASSIGN_OR_RAISE(
+        original_indices_ptr,
+        compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx));
+    original_indices = original_indices_ptr.get();
+  } else {
+    original_indices = &indices;
+  }
+
+  std::shared_ptr<Array> unsort_indices;
+  {
+    ARROW_ASSIGN_OR_RAISE(
+        auto sort_indices,
+        compute::SortIndices(*original_indices, compute::SortOrder::Ascending, 
&ctx));
+    ARROW_ASSIGN_OR_RAISE(original_indices_ptr,
+                          compute::Take(*original_indices, *sort_indices,
+                                        compute::TakeOptions::Defaults(), 
&ctx));
+    original_indices = original_indices_ptr.get();
+    ARROW_ASSIGN_OR_RAISE(
+        unsort_indices,
+        compute::SortIndices(*sort_indices, compute::SortOrder::Ascending, 
&ctx));
+  }
+
+  RecordBatchVector out_batches;
+
+  auto raw_indices = static_cast<const 
Int64Array&>(*original_indices).raw_values();
+  int64_t offset = 0, row_begin = 0;
+
+  ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches());
+  while (true) {
+    ARROW_ASSIGN_OR_RAISE(auto batch, batch_it.Next());
+    if (IsIterationEnd(batch)) break;
+    if (offset == original_indices->length()) break;
+    DCHECK_LT(offset, original_indices->length());
+
+    int64_t length = 0;
+    while (offset + length < original_indices->length()) {
+      auto rel_index = raw_indices[offset + length] - row_begin;
+      if (rel_index >= batch.record_batch->num_rows()) break;
+      ++length;
+    }
+    DCHECK_LE(offset + length, original_indices->length());

Review comment:
       Would you not want to skip empty arrays (where `length == 0`)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to