WillAyd commented on code in PR #14505:
URL: https://github.com/apache/arrow/pull/14505#discussion_r1004837884


##########
cpp/src/arrow/compute/kernels/vector_sort_test.cc:
##########
@@ -2182,5 +2185,24 @@ TEST(TestRankForFixedSizeBinary, RankFixedSizeBinary) {
       ArrayFromJSON(binary_type, R"(["aaa", "   ", "eee", null, "eee", null, " 
  "])"));
 }
 
+
+TEST(TestRankForReal, RankRealChunked) {

Review Comment:
   Here I have created the Chunk test as a separate item, which duplicates all 
of the looping logic. If we didn't want to do that, could also just implement 
the ChunkedArray constructors in the existing tests. 
   
   I think most tests go with this pattern, so happy to continue and copy over 
all of the rest of the type tests if that's what we want



##########
cpp/src/arrow/compute/kernels/vector_sort.cc:
##########
@@ -2078,6 +2078,293 @@ class ArrayRanker : public TypeVisitor {
   Datum* output_;
 };
 
+class ChunkedArrayRanker : public TypeVisitor {
+ public:
+  // TODO: here we accept order / null_placement / tiebreaker as separate 
arguments
+  // whereas the ArrayRanker accepts them as the RankOptions struct; this is 
consistent
+  // with ArraySorter / ChunkedArraySorter, so likely should refactor 
ArrayRanker
+  ChunkedArrayRanker(ExecContext* ctx, uint64_t* indices_begin, uint64_t* 
indices_end,
+                     const ChunkedArray& chunked_array, const SortOrder order,
+                     const NullPlacement null_placement, const 
RankOptions::Tiebreaker tiebreaker, Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        chunked_array_(chunked_array),
+        physical_type_(GetPhysicalType(chunked_array.type())),
+        physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+        order_(order),
+        null_placement_(null_placement),
+        tiebreaker_(tiebreaker),
+        output_(output) {}
+
+  Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return RankInternal<TYPE>(); }
+
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+  template <typename InType>
+  Status RankInternal() {
+    using GetView = GetViewType<InType>;
+    using T = typename GetViewType<InType>::T;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+    const auto num_chunks = chunked_array_.num_chunks();
+    if (num_chunks == 0) {
+      return Status::OK();
+    }
+    const auto arrays = GetArrayPointers(physical_chunks_);
+
+    ArraySortOptions array_options(order_, null_placement_);
+
+    ARROW_ASSIGN_OR_RAISE(auto array_sorter, GetArraySorter(*physical_type_));
+
+    // See related ChunkedArraySort method for comments

Review Comment:
   This and the next 20 lines of merging code is copied from the 
ChunkedArraySort implementation. Could refactor to make it a function each 
class could call instead I think



##########
cpp/src/arrow/compute/kernels/vector_sort.cc:
##########
@@ -2078,6 +2078,293 @@ class ArrayRanker : public TypeVisitor {
   Datum* output_;
 };
 
+class ChunkedArrayRanker : public TypeVisitor {
+ public:
+  // TODO: here we accept order / null_placement / tiebreaker as separate 
arguments
+  // whereas the ArrayRanker accepts them as the RankOptions struct; this is 
consistent
+  // with ArraySorter / ChunkedArraySorter, so likely should refactor 
ArrayRanker
+  ChunkedArrayRanker(ExecContext* ctx, uint64_t* indices_begin, uint64_t* 
indices_end,
+                     const ChunkedArray& chunked_array, const SortOrder order,
+                     const NullPlacement null_placement, const 
RankOptions::Tiebreaker tiebreaker, Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        chunked_array_(chunked_array),
+        physical_type_(GetPhysicalType(chunked_array.type())),
+        physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+        order_(order),
+        null_placement_(null_placement),
+        tiebreaker_(tiebreaker),
+        output_(output) {}
+
+  Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return RankInternal<TYPE>(); }
+
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+  template <typename InType>
+  Status RankInternal() {
+    using GetView = GetViewType<InType>;
+    using T = typename GetViewType<InType>::T;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+    const auto num_chunks = chunked_array_.num_chunks();
+    if (num_chunks == 0) {
+      return Status::OK();
+    }
+    const auto arrays = GetArrayPointers(physical_chunks_);
+
+    ArraySortOptions array_options(order_, null_placement_);
+
+    ARROW_ASSIGN_OR_RAISE(auto array_sorter, GetArraySorter(*physical_type_));
+
+    // See related ChunkedArraySort method for comments
+    std::vector<NullPartitionResult> sorted(num_chunks);
+    int64_t begin_offset = 0;
+    int64_t end_offset = 0;
+    int64_t null_count = 0;
+    for (int i = 0; i < num_chunks; ++i) {
+      const auto array = checked_cast<const ArrayType*>(arrays[i]);
+      end_offset += array->length();
+      null_count += array->null_count();
+      sorted[i] = array_sorter(indices_begin_ + begin_offset, indices_begin_ + 
end_offset,
+                               *array, begin_offset, array_options);
+      begin_offset = end_offset;
+    }
+    DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
+
+    if (sorted.size() > 1) {
+      auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
+                             uint64_t* nulls_end, uint64_t* temp_indices,
+                             int64_t null_count) {
+        if (has_null_like_values<typename ArrayType::TypeClass>::value) {
+          PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end,
+                                                ChunkedArrayResolver(arrays), 
null_count,
+                                                null_placement_);
+        }
+      };
+      auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
+                                 uint64_t* range_end, uint64_t* temp_indices) {
+        MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
+                                 temp_indices);
+      };
+
+      MergeImpl merge_impl{null_placement_, std::move(merge_nulls),
+                           std::move(merge_non_nulls)};
+      // std::merge is only called on non-null values, so size temp indices 
accordingly
+      RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ - 
null_count));
+
+      while (sorted.size() > 1) {
+        auto out_it = sorted.begin();
+        auto it = sorted.begin();
+        while (it < sorted.end() - 1) {
+          const auto& left = *it++;
+          const auto& right = *it++;
+          DCHECK_EQ(left.overall_end(), right.overall_begin());
+          const auto merged = merge_impl.Merge(left, right, null_count);
+          *out_it++ = merged;
+        }
+        if (it < sorted.end()) {
+          *out_it++ = *it++;
+        }
+        sorted.erase(out_it, sorted.end());
+      }
+    }
+
+    DCHECK_EQ(sorted.size(), 1);
+    DCHECK_EQ(sorted[0].overall_begin(), indices_begin_);
+    DCHECK_EQ(sorted[0].overall_end(), indices_end_);
+    // Note that "nulls" can also include NaNs, hence the >= check
+    DCHECK_GE(sorted[0].null_count(), null_count);
+
+    auto length = indices_end_ - indices_begin_;
+    ARROW_ASSIGN_OR_RAISE(auto rankings,
+                          MakeMutableUInt64Array(uint64(), length, 
ctx_->memory_pool()));
+    auto out_begin = rankings->GetMutableValues<uint64_t>(1);
+    uint64_t rank;
+
+    switch (tiebreaker_) {
+      case RankOptions::Dense: {
+        T curr_value, prev_value{};
+        rank = 0;
+
+        if (null_placement_ == NullPlacement::AtStart && 
sorted[0].null_count() > 0) {
+          rank++;
+          for (auto it = sorted[0].nulls_begin; it < sorted[0].nulls_end; 
it++) {
+            out_begin[*it] = rank;
+          }
+        }
+
+        for (auto it = sorted[0].non_nulls_begin; it < 
sorted[0].non_nulls_end; it++) {
+          // Below code wasn't working for string specialization as -> value 
returned a buffer
+          // but T is a basic_string_view
+          // using ScalarType = typename TypeTraits<InType>::ScalarType;
+          // auto scalar = std::dynamic_pointer_cast<ScalarType>(
+          //   chunked_array_.GetScalar(*it).ValueOrDie());
+          // curr_value = scalar->value;
+          // TODO: can we use chunk_resolver_ from chunked array externally?   
       
+          if (*it >= 2) {

Review Comment:
   This is hacked together just to work for the provided test case but 
obviously needs a better implementation. I wasn't sure what the best approach 
was to get the logical value from a particular index in a ChunkedArray. 
Alternately we can bypass the ChunkedArray and try to use the `arrays` 
variable, but I don't know that that would be any cleaner



##########
cpp/src/arrow/compute/kernels/vector_sort.cc:
##########
@@ -2078,6 +2078,293 @@ class ArrayRanker : public TypeVisitor {
   Datum* output_;
 };
 
+class ChunkedArrayRanker : public TypeVisitor {
+ public:
+  // TODO: here we accept order / null_placement / tiebreaker as separate 
arguments
+  // whereas the ArrayRanker accepts them as the RankOptions struct; this is 
consistent
+  // with ArraySorter / ChunkedArraySorter, so likely should refactor 
ArrayRanker
+  ChunkedArrayRanker(ExecContext* ctx, uint64_t* indices_begin, uint64_t* 
indices_end,
+                     const ChunkedArray& chunked_array, const SortOrder order,
+                     const NullPlacement null_placement, const 
RankOptions::Tiebreaker tiebreaker, Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        chunked_array_(chunked_array),
+        physical_type_(GetPhysicalType(chunked_array.type())),
+        physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+        order_(order),
+        null_placement_(null_placement),
+        tiebreaker_(tiebreaker),
+        output_(output) {}
+
+  Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return RankInternal<TYPE>(); }
+
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+  template <typename InType>
+  Status RankInternal() {
+    using GetView = GetViewType<InType>;
+    using T = typename GetViewType<InType>::T;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+    const auto num_chunks = chunked_array_.num_chunks();
+    if (num_chunks == 0) {
+      return Status::OK();
+    }
+    const auto arrays = GetArrayPointers(physical_chunks_);
+
+    ArraySortOptions array_options(order_, null_placement_);
+
+    ARROW_ASSIGN_OR_RAISE(auto array_sorter, GetArraySorter(*physical_type_));
+
+    // See related ChunkedArraySort method for comments
+    std::vector<NullPartitionResult> sorted(num_chunks);
+    int64_t begin_offset = 0;
+    int64_t end_offset = 0;
+    int64_t null_count = 0;
+    for (int i = 0; i < num_chunks; ++i) {
+      const auto array = checked_cast<const ArrayType*>(arrays[i]);
+      end_offset += array->length();
+      null_count += array->null_count();
+      sorted[i] = array_sorter(indices_begin_ + begin_offset, indices_begin_ + 
end_offset,
+                               *array, begin_offset, array_options);
+      begin_offset = end_offset;
+    }
+    DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
+
+    if (sorted.size() > 1) {
+      auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
+                             uint64_t* nulls_end, uint64_t* temp_indices,
+                             int64_t null_count) {
+        if (has_null_like_values<typename ArrayType::TypeClass>::value) {
+          PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end,
+                                                ChunkedArrayResolver(arrays), 
null_count,
+                                                null_placement_);
+        }
+      };
+      auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
+                                 uint64_t* range_end, uint64_t* temp_indices) {
+        MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
+                                 temp_indices);
+      };
+
+      MergeImpl merge_impl{null_placement_, std::move(merge_nulls),
+                           std::move(merge_non_nulls)};
+      // std::merge is only called on non-null values, so size temp indices 
accordingly
+      RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ - 
null_count));
+
+      while (sorted.size() > 1) {
+        auto out_it = sorted.begin();
+        auto it = sorted.begin();
+        while (it < sorted.end() - 1) {
+          const auto& left = *it++;
+          const auto& right = *it++;
+          DCHECK_EQ(left.overall_end(), right.overall_begin());
+          const auto merged = merge_impl.Merge(left, right, null_count);
+          *out_it++ = merged;
+        }
+        if (it < sorted.end()) {
+          *out_it++ = *it++;
+        }
+        sorted.erase(out_it, sorted.end());
+      }
+    }
+
+    DCHECK_EQ(sorted.size(), 1);
+    DCHECK_EQ(sorted[0].overall_begin(), indices_begin_);
+    DCHECK_EQ(sorted[0].overall_end(), indices_end_);
+    // Note that "nulls" can also include NaNs, hence the >= check
+    DCHECK_GE(sorted[0].null_count(), null_count);
+
+    auto length = indices_end_ - indices_begin_;
+    ARROW_ASSIGN_OR_RAISE(auto rankings,
+                          MakeMutableUInt64Array(uint64(), length, 
ctx_->memory_pool()));
+    auto out_begin = rankings->GetMutableValues<uint64_t>(1);
+    uint64_t rank;
+
+    switch (tiebreaker_) {

Review Comment:
   Except for nuance around how to retrieve a value from an array at a 
particular index in a performant manner, this Rank algorithm is copy / paste 
from the `ArrayRanker` class, so likely can refactor into a function outside of 
the classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to