lidavidm commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r612722771
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +478,80 @@ Future<std::shared_ptr<Table>>
SyncScanner::ToTableInternal(
});
}
+Result<std::shared_ptr<Table>> Scanner::TakeRows(const Array& indices) {
+ if (indices.null_count() != 0) {
+ return Status::NotImplemented("null take indices");
+ }
+
+ compute::ExecContext ctx(scan_options_->pool);
+
+ const Array* original_indices;
+ // If we have to cast, this is the backing reference
+ std::shared_ptr<Array> original_indices_ptr;
+ if (indices.type_id() != Type::INT64) {
+ ARROW_ASSIGN_OR_RAISE(
+ original_indices_ptr,
+ compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx));
+ original_indices = original_indices_ptr.get();
+ } else {
+ original_indices = &indices;
+ }
+
+ std::shared_ptr<Array> unsort_indices;
+ {
+ ARROW_ASSIGN_OR_RAISE(
+ auto sort_indices,
+ compute::SortIndices(*original_indices, compute::SortOrder::Ascending,
&ctx));
+ ARROW_ASSIGN_OR_RAISE(original_indices_ptr,
+ compute::Take(*original_indices, *sort_indices,
+ compute::TakeOptions::Defaults(),
&ctx));
+ original_indices = original_indices_ptr.get();
+ ARROW_ASSIGN_OR_RAISE(
+ unsort_indices,
+ compute::SortIndices(*sort_indices, compute::SortOrder::Ascending,
&ctx));
+ }
+
+ RecordBatchVector out_batches;
+
+ auto raw_indices = static_cast<const
Int64Array&>(*original_indices).raw_values();
+ int64_t offset = 0, row_begin = 0;
+
+ ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches());
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, batch_it.Next());
+ if (IsIterationEnd(batch)) break;
+ if (offset == original_indices->length()) break;
+ DCHECK_LT(offset, original_indices->length());
+
+ int64_t length = 0;
+ while (offset + length < original_indices->length()) {
+ auto rel_index = raw_indices[offset + length] - row_begin;
+ if (rel_index >= batch.record_batch->num_rows()) break;
+ ++length;
+ }
+ DCHECK_LE(offset + length, original_indices->length());
Review comment:
Thanks, that also exposed a bug (or well, poor error message) if all
indices were out-of-bounds.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -364,5 +478,80 @@ Future<std::shared_ptr<Table>>
SyncScanner::ToTableInternal(
});
}
+Result<std::shared_ptr<Table>> Scanner::TakeRows(const Array& indices) {
+ if (indices.null_count() != 0) {
+ return Status::NotImplemented("null take indices");
+ }
+
+ compute::ExecContext ctx(scan_options_->pool);
+
+ const Array* original_indices;
+ // If we have to cast, this is the backing reference
+ std::shared_ptr<Array> original_indices_ptr;
+ if (indices.type_id() != Type::INT64) {
+ ARROW_ASSIGN_OR_RAISE(
+ original_indices_ptr,
+ compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx));
+ original_indices = original_indices_ptr.get();
+ } else {
+ original_indices = &indices;
+ }
+
+ std::shared_ptr<Array> unsort_indices;
+ {
+ ARROW_ASSIGN_OR_RAISE(
+ auto sort_indices,
+ compute::SortIndices(*original_indices, compute::SortOrder::Ascending,
&ctx));
+ ARROW_ASSIGN_OR_RAISE(original_indices_ptr,
+ compute::Take(*original_indices, *sort_indices,
+ compute::TakeOptions::Defaults(),
&ctx));
+ original_indices = original_indices_ptr.get();
+ ARROW_ASSIGN_OR_RAISE(
+ unsort_indices,
+ compute::SortIndices(*sort_indices, compute::SortOrder::Ascending,
&ctx));
+ }
+
+ RecordBatchVector out_batches;
+
+ auto raw_indices = static_cast<const
Int64Array&>(*original_indices).raw_values();
+ int64_t offset = 0, row_begin = 0;
+
+ ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches());
Review comment:
I filed ARROW-12369.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]