WillAyd commented on code in PR #14505:
URL: https://github.com/apache/arrow/pull/14505#discussion_r1006264959
##########
cpp/src/arrow/compute/kernels/vector_sort.cc:
##########
@@ -2078,6 +2078,293 @@ class ArrayRanker : public TypeVisitor {
Datum* output_;
};
+class ChunkedArrayRanker : public TypeVisitor {
+ public:
+ // TODO: here we accept order / null_placement / tiebreaker as separate
arguments
+ // whereas the ArrayRanker accepts them as the RankOptions struct; this is
consistent
+ // with ArraySorter / ChunkedArraySorter, so likely should refactor
ArrayRanker
+ ChunkedArrayRanker(ExecContext* ctx, uint64_t* indices_begin, uint64_t*
indices_end,
+ const ChunkedArray& chunked_array, const SortOrder order,
+ const NullPlacement null_placement, const
RankOptions::Tiebreaker tiebreaker, Datum* output)
+ : TypeVisitor(),
+ ctx_(ctx),
+ indices_begin_(indices_begin),
+ indices_end_(indices_end),
+ chunked_array_(chunked_array),
+ physical_type_(GetPhysicalType(chunked_array.type())),
+ physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+ order_(order),
+ null_placement_(null_placement),
+ tiebreaker_(tiebreaker),
+ output_(output) {}
+
+ Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+ Status Visit(const TYPE& type) { return RankInternal<TYPE>(); }
+
+ VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+ template <typename InType>
+ Status RankInternal() {
+ using GetView = GetViewType<InType>;
+ using T = typename GetViewType<InType>::T;
+ using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+ const auto num_chunks = chunked_array_.num_chunks();
+ if (num_chunks == 0) {
+ return Status::OK();
+ }
+ const auto arrays = GetArrayPointers(physical_chunks_);
+
+ ArraySortOptions array_options(order_, null_placement_);
+
+ ARROW_ASSIGN_OR_RAISE(auto array_sorter, GetArraySorter(*physical_type_));
+
+ // See related ChunkedArraySort method for comments
+ std::vector<NullPartitionResult> sorted(num_chunks);
+ int64_t begin_offset = 0;
+ int64_t end_offset = 0;
+ int64_t null_count = 0;
+ for (int i = 0; i < num_chunks; ++i) {
+ const auto array = checked_cast<const ArrayType*>(arrays[i]);
+ end_offset += array->length();
+ null_count += array->null_count();
+ sorted[i] = array_sorter(indices_begin_ + begin_offset, indices_begin_ +
end_offset,
+ *array, begin_offset, array_options);
+ begin_offset = end_offset;
+ }
+ DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
+
+ if (sorted.size() > 1) {
+ auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
+ uint64_t* nulls_end, uint64_t* temp_indices,
+ int64_t null_count) {
+ if (has_null_like_values<typename ArrayType::TypeClass>::value) {
+ PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end,
+ ChunkedArrayResolver(arrays),
null_count,
+ null_placement_);
+ }
+ };
+ auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
+ uint64_t* range_end, uint64_t* temp_indices) {
+ MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
+ temp_indices);
+ };
+
+ MergeImpl merge_impl{null_placement_, std::move(merge_nulls),
+ std::move(merge_non_nulls)};
+ // std::merge is only called on non-null values, so size temp indices
accordingly
+ RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ -
null_count));
+
+ while (sorted.size() > 1) {
+ auto out_it = sorted.begin();
+ auto it = sorted.begin();
+ while (it < sorted.end() - 1) {
+ const auto& left = *it++;
+ const auto& right = *it++;
+ DCHECK_EQ(left.overall_end(), right.overall_begin());
+ const auto merged = merge_impl.Merge(left, right, null_count);
+ *out_it++ = merged;
+ }
+ if (it < sorted.end()) {
+ *out_it++ = *it++;
+ }
+ sorted.erase(out_it, sorted.end());
+ }
+ }
+
+ DCHECK_EQ(sorted.size(), 1);
+ DCHECK_EQ(sorted[0].overall_begin(), indices_begin_);
+ DCHECK_EQ(sorted[0].overall_end(), indices_end_);
+ // Note that "nulls" can also include NaNs, hence the >= check
+ DCHECK_GE(sorted[0].null_count(), null_count);
+
+ auto length = indices_end_ - indices_begin_;
+ ARROW_ASSIGN_OR_RAISE(auto rankings,
+ MakeMutableUInt64Array(uint64(), length,
ctx_->memory_pool()));
+ auto out_begin = rankings->GetMutableValues<uint64_t>(1);
+ uint64_t rank;
+
+ switch (tiebreaker_) {
+ case RankOptions::Dense: {
+ T curr_value, prev_value{};
+ rank = 0;
+
+ if (null_placement_ == NullPlacement::AtStart &&
sorted[0].null_count() > 0) {
+ rank++;
+ for (auto it = sorted[0].nulls_begin; it < sorted[0].nulls_end;
it++) {
+ out_begin[*it] = rank;
+ }
+ }
+
+ for (auto it = sorted[0].non_nulls_begin; it <
sorted[0].non_nulls_end; it++) {
+ // Below code wasn't working for string specialization as -> value
returned a buffer
+ // but T is a basic_string_view
+ // using ScalarType = typename TypeTraits<InType>::ScalarType;
+ // auto scalar = std::dynamic_pointer_cast<ScalarType>(
+ // chunked_array_.GetScalar(*it).ValueOrDie());
+ // curr_value = scalar->value;
+ // TODO: can we use chunk_resolver_ from chunked array externally?
+ if (*it >= 2) {
Review Comment:
My current best guess is to execute `take` with the indices and iterate over
that result. I don't know that I've seen that pattern internally yet so happy
to go a different route if that is too heavy handed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]