pitrou commented on code in PR #46926:
URL: https://github.com/apache/arrow/pull/46926#discussion_r3396709034


##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -102,10 +103,15 @@ class ARROW_EXPORT ArraySortOptions : public 
FunctionOptions {
   NullPlacement null_placement;
 };
 
+ARROW_SUPPRESS_DEPRECATION_WARNING

Review Comment:
   Is it necessary to wrap the entire `class` declaration, or can this macro 
only apply to `GetSortKeys`?



##########
cpp/src/arrow/compute/ordering.h:
##########
@@ -56,12 +57,16 @@ class ARROW_EXPORT SortKey : public 
util::EqualityComparable<SortKey> {
   FieldRef target;
   /// How to order by this sort key.
   SortOrder order;
+  /// Null placement for this sort key.
+  NullPlacement null_placement;
 };
 
 class ARROW_EXPORT Ordering : public util::EqualityComparable<Ordering> {
  public:
-  Ordering(std::vector<SortKey> sort_keys,
-           NullPlacement null_placement = NullPlacement::AtStart)
+  explicit Ordering(std::vector<SortKey> sort_keys) : 
sort_keys_(std::move(sort_keys)) {}
+
+  // DEPRECATED(will be removed after removing null_placement from Ordering)
+  Ordering(std::vector<SortKey> sort_keys, std::optional<NullPlacement> 
null_placement)

Review Comment:
   Use `ARROW_DEPRECATED` as well?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2256,18 +2266,24 @@ class SortOptions(_SortOptions):
 
     Parameters
     ----------
-    sort_keys : sequence of (name, order) tuples
+    sort_keys : sequence of (name, order, null_placement) tuples
         Names of field/column keys to sort the input on,
         along with the order each field/column is sorted in.
         Accepted values for `order` are "ascending", "descending".
+        Accepted values for `null_placement` are "at_start", "at_end".
         The field name can be a string column name or expression.
-    null_placement : str, default "at_end"
-        Where nulls in input should be sorted, only applying to
-        columns/fields mentioned in `sort_keys`.
+    null_placement : str | None, default None
+        Where nulls in input should be sorted, overwrites
+         `null_placement` in `sort_keys`.
         Accepted values are "at_start", "at_end".
     """
 
-    def __init__(self, sort_keys=(), *, null_placement="at_end"):
+    def __init__(self, sort_keys=(), *, null_placement=None):
+        if null_placement is not None:
+            warnings.warn(
+                "Specifying null_placement in SortOptions is deprecated "
+                "as of 24.0.0. Specify null_placement per sort_key instead."

Review Comment:
   Same here: make this a `FutureWarning`



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3397,17 +3397,38 @@ def test_table_sort_by(cls):
         pa.array(["foo", "car", "bar", "foobar"])
     ], names=["a", "b"])
 
-    sorted_tab = tab.sort_by([("a", "descending")])
+    sorted_tab = tab.sort_by([("a", "descending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [35, 7, 7, 5]
     assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"]
 
-    sorted_tab = tab.sort_by([("a", "ascending")])
+    sorted_tab = tab.sort_by([("a", "ascending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [5, 7, 7, 35]
     assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]
 
 
+def test_record_batch_sort():
+    rb = pa.RecordBatch.from_arrays([
+        pa.array([7, 35, 7, 5], type=pa.int64()),
+        pa.array([4, 1, 3, 2], type=pa.int64()),
+        pa.array(["foo", "car", "bar", "foobar"])
+    ], names=["a", "b", "c"])
+
+    sorted_rb = rb.sort_by([("a", "descending", "at_end"),
+                           ("b", "descending", "at_end")])

Review Comment:
   Can these two sort keys use different null placements, to make sure 
parameter unwrapping works as expected?



##########
cpp/src/arrow/compute/api_vector.cc:
##########
@@ -138,8 +140,7 @@ static auto kArraySortOptionsType = 
GetFunctionOptionsType<ArraySortOptions>(
     DataMember("order", &ArraySortOptions::order),
     DataMember("null_placement", &ArraySortOptions::null_placement));
 static auto kSortOptionsType = GetFunctionOptionsType<SortOptions>(
-    DataMember("sort_keys", &SortOptions::sort_keys),
-    DataMember("null_placement", &SortOptions::null_placement));
+    CoercedDataMember("sort_keys", &SortOptions::sort_keys, 
&SortOptions::GetSortKeys));

Review Comment:
   Can you add some new examples in 
https://github.com/apache/arrow/blob/4f3b0f01b51c8d596334027f939fa97f45b5ac05/cpp/src/arrow/compute/function_test.cc#L45
 to make sure that all SortOptions / SortKey fields are considered for 
comparison and serialization?



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -102,51 +218,45 @@ class ArraySelector : public TypeVisitor {
 
   template <typename InType, SortOrder sort_order>
   Status SelectKthInternal() {
-    using GetView = GetViewType<InType>;
     using ArrayType = typename TypeTraits<InType>::ArrayType;
 
     ArrayType arr(array_.data());
-    std::vector<uint64_t> indices(arr.length());
 
-    uint64_t* indices_begin = indices.data();
-    uint64_t* indices_end = indices_begin + indices.size();
-    std::iota(indices_begin, indices_end, 0);
     if (k_ > arr.length()) {
       k_ = arr.length();
     }
 
-    const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
-        indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
-    const auto end_iter = p.non_nulls_end;
+    std::vector<uint64_t> indices(arr.length());
 
-    auto kth_begin = std::min(indices_begin + k_, end_iter);
+    uint64_t* indices_begin = indices.data();
+    uint64_t* indices_end = indices_begin + indices.size();
+    std::iota(indices_begin, indices_end, 0);
 
-    SelectKComparator<sort_order> comparator;
-    auto cmp = [&arr, &comparator](uint64_t left, uint64_t right) {
-      const auto lval = GetView::LogicalValue(arr.GetView(left));
-      const auto rval = GetView::LogicalValue(arr.GetView(right));
-      return comparator(lval, rval);
-    };
-    using HeapContainer =
-        std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
-    HeapContainer heap(indices_begin, kth_begin, cmp);
-    for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
-      uint64_t x_index = *iter;
-      if (cmp(x_index, heap.top())) {
-        heap.pop();
-        heap.push(x_index);
-      }
-    }
-    auto out_size = static_cast<int64_t>(heap.size());
     ARROW_ASSIGN_OR_RAISE(auto take_indices,
-                          MakeMutableUInt64Array(out_size, 
ctx_->memory_pool()));
+                          MakeMutableUInt64Array(k_, ctx_->memory_pool()));
+    auto* output_begin = take_indices->template GetMutableValues<uint64_t>(1);
+
+    const auto p = PartitionNullsAndNans<ArrayType, NonStablePartitioner>(
+        indices_begin, indices_end, arr, 0, null_placement_);
+
+    // From k, calculate
+    //   l = non_null_like elements to take from PartitionResult
+    //   m = nan elements to take from PartitionResult
+    //   n = null elements to take from PartitionResult
+    // k = l + m + n because k was clipped to arr.length()
+    // And directly compute the ranges in {output, output+k} where we will 
need to place
+    // the selected elements from each group -> no longer need to track 
null_placement

Review Comment:
   I don't think it's useful to refer to the previous implementation.
   
   ```suggestion
       // And directly compute the ranges in {output, output+k} where we will 
need to place
       // the selected elements from each group.
   ```



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -252,16 +418,17 @@ class ChunkedArraySelector : public TypeVisitor {
       offset += chunk->length();
     }
 
-    auto out_size = static_cast<int64_t>(heap.size());
-    ARROW_ASSIGN_OR_RAISE(auto take_indices,
-                          MakeMutableUInt64Array(out_size, 
ctx_->memory_pool()));
-    auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size 
- 1;
-    while (heap.size() > 0) {
-      auto top_item = heap.top();
-      *out_cbegin = top_item.index + top_item.offset;
+    // We sized output.non_null_like_range to hold exactly sufficient indices,
+    // so the heap must have been completely filled
+    assert(heap.size() == output.non_null_like_range.size());

Review Comment:
   Nit: use `DCHECK_EQ`



##########
python/pyarrow/_acero.pyx:
##########
@@ -254,18 +262,24 @@ class OrderByNodeOptions(_OrderByNodeOptions):
 
     Parameters
     ----------
-    sort_keys : sequence of (name, order) tuples
+    sort_keys : sequence of (name, order, null_placement="at_end") tuples
         Names of field/column keys to sort the input on,
         along with the order each field/column is sorted in.
-        Accepted values for `order` are "ascending", "descending".
         Each field reference can be a string column name or expression.
-    null_placement : str, default "at_end"
+        Accepted values for `order` are "ascending", "descending".
+        Accepted values for `null_placement` are "at_start", "at_end".
+    null_placement : str, optional
         Where nulls in input should be sorted, only applying to
         columns/fields mentioned in `sort_keys`.
-        Accepted values are "at_start", "at_end".
+        Accepted values are "at_start", "at_end",
     """
 
-    def __init__(self, sort_keys=(), *, null_placement="at_end"):
+    def __init__(self, sort_keys=(), *, null_placement=None):
+        if null_placement is not None:
+            warnings.warn(
+                "Specifying null_placement in OrderByNodeOptions is deprecated 
"
+                "as of 24.0.0. Specify null_placement per sort_key instead."
+            )

Review Comment:
   We should make this a `FutureWarning` (instead of the default 
`UserWarning`), something like:
   
   ```suggestion
               warnings.warn(
                   "Specifying null_placement in OrderByNodeOptions is 
deprecated "
                   "as of 24.0.0. Specify null_placement per sort_key instead.",
                   FutureWarning
               )
   ```



##########
python/pyarrow/_acero.pyx:
##########
@@ -28,6 +28,7 @@ from pyarrow.includes.libarrow_acero cimport *
 from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table,
                           RecordBatchReader)
 from pyarrow.lib import frombytes, tobytes
+import warnings

Review Comment:
   Can you move this standard library import above the `pyarrow` imports?



##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -175,43 +204,87 @@ class ARROW_EXPORT RankOptions : public FunctionOptions {
     Dense
   };
 
-  explicit RankOptions(std::vector<SortKey> sort_keys = {},
-                       NullPlacement null_placement = NullPlacement::AtEnd,
+  ARROW_DEPRECATED("Deprecated in arrow 24.0.0, use null_placement in 
sort_keys instead")
+  explicit RankOptions(std::vector<SortKey> sort_keys,
+                       std::optional<NullPlacement> null_placement = 
std::nullopt,
                        Tiebreaker tiebreaker = RankOptions::First);
   /// Convenience constructor for array inputs
-  explicit RankOptions(SortOrder order,
-                       NullPlacement null_placement = NullPlacement::AtEnd,
+  explicit RankOptions(SortOrder order, NullPlacement null_placement,
                        Tiebreaker tiebreaker = RankOptions::First)
-      : RankOptions({SortKey("", order)}, null_placement, tiebreaker) {}
+      : RankOptions({SortKey("", order, null_placement)}, tiebreaker) {}
+
+  explicit RankOptions(std::vector<SortKey> sort_keys = {},
+                       Tiebreaker tiebreaker = RankOptions::First);
+
+  /// Convenience constructor for array inputs
+  explicit RankOptions(SortOrder order, Tiebreaker tiebreaker = 
RankOptions::First)
+      : RankOptions({SortKey("", order)}, tiebreaker) {}
 
   static constexpr const char kTypeName[] = "RankOptions";
   static RankOptions Defaults() { return RankOptions(); }
 
+  ARROW_SUPPRESS_DEPRECATION_WARNING
+  /// Get sort_keys with overwritten null_placement
+  /// Will be removed after deprecated null_placement has been removed
+  std::vector<SortKey> GetSortKeys() const {
+    if (!null_placement.has_value()) {
+      return sort_keys;
+    }
+    auto overwritten_sort_keys = sort_keys;
+    for (auto& sort_key : overwritten_sort_keys) {
+      sort_key.null_placement = null_placement.value();
+    }
+    return overwritten_sort_keys;
+  }
+  ARROW_UNSUPPRESS_DEPRECATION_WARNING
+
   /// Column key(s) to order by and how to order by these sort keys.
   std::vector<SortKey> sort_keys;
+
+  // DEPRECATED(set null_placement in sort_keys instead)
   /// Whether nulls and NaNs are placed at the start or at the end
-  NullPlacement null_placement;
+  /// Will overwrite null ordering of sort keys
+  ARROW_DEPRECATED("Deprecated in arrow 24.0.0, use null_placement in 
sort_keys instead")
+  std::optional<NullPlacement> null_placement;
   /// Tiebreaker for dealing with equal values in ranks
   Tiebreaker tiebreaker;
 };
 
 /// \brief Quantile rank options
 class ARROW_EXPORT RankQuantileOptions : public FunctionOptions {
  public:
-  explicit RankQuantileOptions(std::vector<SortKey> sort_keys = {},
-                               NullPlacement null_placement = 
NullPlacement::AtEnd);
+  explicit RankQuantileOptions(
+      std::vector<SortKey> sort_keys = {},
+      std::optional<NullPlacement> null_placement = std::nullopt);
+
   /// Convenience constructor for array inputs
   explicit RankQuantileOptions(SortOrder order,
-                               NullPlacement null_placement = 
NullPlacement::AtEnd)
+                               std::optional<NullPlacement> null_placement = 
std::nullopt)
       : RankQuantileOptions({SortKey("", order)}, null_placement) {}
 
   static constexpr const char kTypeName[] = "RankQuantileOptions";
   static RankQuantileOptions Defaults() { return RankQuantileOptions(); }
 
+  /// Get sort_keys with overwritten null_placement
+  /// Will be removed after deprecated null_placement has been removed
+  std::vector<SortKey> GetSortKeys() const {
+    if (!null_placement.has_value()) {
+      return sort_keys;
+    }
+    auto overwritten_sort_keys = sort_keys;
+    for (auto& sort_key : overwritten_sort_keys) {
+      sort_key.null_placement = null_placement.value();
+    }
+    return overwritten_sort_keys;
+  }
+
   /// Column key(s) to order by and how to order by these sort keys.
   std::vector<SortKey> sort_keys;
+
+  // DEPRECATED(set null_placement in sort_keys instead)

Review Comment:
   Why not use `ARROW_DEPRECATED` as in other options classes?



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3397,17 +3397,38 @@ def test_table_sort_by(cls):
         pa.array(["foo", "car", "bar", "foobar"])
     ], names=["a", "b"])
 
-    sorted_tab = tab.sort_by([("a", "descending")])
+    sorted_tab = tab.sort_by([("a", "descending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [35, 7, 7, 5]
     assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"]
 
-    sorted_tab = tab.sort_by([("a", "ascending")])
+    sorted_tab = tab.sort_by([("a", "ascending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [5, 7, 7, 35]
     assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]
 
 
+def test_record_batch_sort():
+    rb = pa.RecordBatch.from_arrays([
+        pa.array([7, 35, 7, 5], type=pa.int64()),
+        pa.array([4, 1, 3, 2], type=pa.int64()),
+        pa.array(["foo", "car", "bar", "foobar"])
+    ], names=["a", "b", "c"])
+
+    sorted_rb = rb.sort_by([("a", "descending", "at_end"),
+                           ("b", "descending", "at_end")])
+    sorted_rb_dict = sorted_rb.to_pydict()
+    assert sorted_rb_dict["a"] == [35, 7, 7, 5]
+    assert sorted_rb_dict["b"] == [1, 4, 3, 2]
+    assert sorted_rb_dict["c"] == ["car", "foo", "bar", "foobar"]
+
+    sorted_rb = rb.sort_by([("a", "ascending", "at_end"), ("b", "ascending", 
"at_end")])

Review Comment:
   Let's also exercise different per-column sort orders?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2499,17 +2527,26 @@ class RankOptions(_RankOptions):
                    number of distinct values in the input.
     """
 
-    def __init__(self, sort_keys="ascending", *, null_placement="at_end", 
tiebreaker="first"):
+    def __init__(self, sort_keys="ascending", *, null_placement=None, 
tiebreaker="first"):
         self._set_options(sort_keys, null_placement, tiebreaker)

Review Comment:
   Should a non-None null placement be deprecated here?



##########
python/pyarrow/tests/test_table.py:
##########
@@ -3397,17 +3397,38 @@ def test_table_sort_by(cls):
         pa.array(["foo", "car", "bar", "foobar"])
     ], names=["a", "b"])
 
-    sorted_tab = tab.sort_by([("a", "descending")])
+    sorted_tab = tab.sort_by([("a", "descending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [35, 7, 7, 5]
     assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"]
 
-    sorted_tab = tab.sort_by([("a", "ascending")])
+    sorted_tab = tab.sort_by([("a", "ascending", "at_end")])
     sorted_tab_dict = sorted_tab.to_pydict()
     assert sorted_tab_dict["a"] == [5, 7, 7, 35]
     assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]
 
 
+def test_record_batch_sort():
+    rb = pa.RecordBatch.from_arrays([
+        pa.array([7, 35, 7, 5], type=pa.int64()),
+        pa.array([4, 1, 3, 2], type=pa.int64()),
+        pa.array(["foo", "car", "bar", "foobar"])

Review Comment:
   Let's make sure these columns have nulls as well.



##########
cpp/src/arrow/compute/kernels/select_k_test.cc:
##########
@@ -283,6 +380,62 @@ TYPED_TEST(TestSelectKWithChunkedArray, 
RandomValuesWithSlices) {
   }
 }
 
+TYPED_TEST(TestSelectKWithChunkedArray, PartialSelectKNull) {
+  auto chunked_array = ChunkedArrayFromJSON(uint8(), {
+                                                         "[null, 1]",
+                                                         "[3, null, 2]",
+                                                         "[1]",
+                                                     });
+  std::vector<SortKey> sort_keys{SortKey("a", SortOrder::Ascending)};
+  auto options = SelectKOptions(3, sort_keys);
+  auto expected = ChunkedArrayFromJSON(uint8(), {"[1, 1, 2]"});
+  this->Check(chunked_array, options, expected);
+  options.sort_keys[0].null_placement = NullPlacement::AtStart;
+  expected = ChunkedArrayFromJSON(uint8(), {"[null, null, 1]"});
+  this->Check(chunked_array, options, expected);
+}
+
+TYPED_TEST(TestSelectKWithChunkedArray, FullSelectKNull) {
+  auto chunked_array = ChunkedArrayFromJSON(uint8(), {
+                                                         "[null, 1]",
+                                                         "[3, null, 2]",
+                                                         "[1]",
+                                                     });
+  std::vector<SortKey> sort_keys{SortKey("a", SortOrder::Ascending)};
+  auto options = SelectKOptions(10, sort_keys);
+  options.sort_keys[0].null_placement = NullPlacement::AtStart;
+  auto expected = ChunkedArrayFromJSON(uint8(), {"[null, null, 1, 1, 2, 3]"});
+  this->Check(chunked_array, options, expected);
+  options.sort_keys[0].null_placement = NullPlacement::AtEnd;
+  expected = ChunkedArrayFromJSON(uint8(), {"[1, 1, 2, 3, null, null]"});
+  this->Check(chunked_array, options, expected);
+}
+
+TYPED_TEST(TestSelectKWithChunkedArray, PartialSelectKNullNaN) {
+  auto chunked_array = ChunkedArrayFromJSON(
+      float64(), {"[null, 1]", "[3, null, NaN]", "[10, NaN, 2]", "[1]"});
+  std::vector<SortKey> sort_keys{SortKey("a", SortOrder::Descending)};
+  auto options = SelectKOptions(3, sort_keys);
+  options.sort_keys[0].null_placement = NullPlacement::AtStart;
+  auto expected = ChunkedArrayFromJSON(float64(), {"[null, null, NaN]"});
+  this->Check(chunked_array, options, expected);
+  options.sort_keys[0].null_placement = NullPlacement::AtEnd;
+  expected = ChunkedArrayFromJSON(float64(), {"[10, 3, 2]"});
+  this->Check(chunked_array, options, expected);
+}
+
+TYPED_TEST(TestSelectKWithChunkedArray, FullSelectKNullNaN) {
+  auto chunked_array = ChunkedArrayFromJSON(
+      float64(), {"[null, 1]", "[3, null, NaN]", "[10, NaN, 2]", "[1]"});
+  std::vector<SortKey> sort_keys{SortKey("a", SortOrder::Descending)};
+  auto options = SelectKOptions(10, sort_keys);
+  options.sort_keys[0].null_placement = NullPlacement::AtStart;
+  // These check that nulls and Nan are sorted in a stable way, but do we want 
that?

Review Comment:
   I don't think so? Same problem in `TestSelectKWithArray` by the way.



##########
cpp/src/arrow/compute/kernels/vector_sort_test.cc:
##########
@@ -1252,9 +1248,39 @@ TEST_F(TestRecordBatchSortIndices, Null) {
   const std::vector<SortKey> sort_keys{SortKey("a", SortOrder::Ascending),
                                        SortKey("b", SortOrder::Descending)};
 
-  SortOptions options(sort_keys, NullPlacement::AtEnd);
+  SortOptions options(sort_keys);
+  AssertSortIndices(batch, options, "[5, 1, 4, 6, 2, 0, 3]");
+  options.sort_keys[0].null_placement = NullPlacement::AtStart;
+  AssertSortIndices(batch, options, "[0, 3, 5, 1, 4, 6, 2]");
+  options.sort_keys[1].null_placement = NullPlacement::AtStart;
+  AssertSortIndices(batch, options, "[3, 0, 5, 1, 4, 2, 6]");
+}
+
+TEST_F(TestRecordBatchSortIndices, MixedNullOrdering) {

Review Comment:
   Is it the exact same test as above? If it is, then perhaps remove it?



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -205,19 +339,38 @@ class ChunkedArraySelector : public TypeVisitor {
     if (k_ > chunked_array_.length()) {
       k_ = chunked_array_.length();
     }
+
+    ARROW_ASSIGN_OR_RAISE(auto take_indices,
+                          MakeMutableUInt64Array(k_, ctx_->memory_pool()));
+    auto* output_begin = take_indices->GetMutableValues<uint64_t>(1);
+
+    int64_t null_count = chunked_array_.null_count();
+    int64_t nan_count = ComputeNanCount<InType>();
+    int64_t non_null_like_count = chunked_array_.length() - null_count - 
nan_count;
+
+    auto output = CalculateOutputRangesByNullLikeness(non_null_like_count, 
nan_count,
+                                                      null_count, 
null_placement_,
+                                                      {output_begin, 
output_begin + k_});
+
+    // Now we can independently fill the output with non_null, nan and null 
items.
+    // For non_null, we do a heap_sort, the others can just be copied until
+    // nan_taken == output.nan_range.size() and
+    // null_taken == output.null_range.size() respectively
+    size_t nan_taken = 0;
+    size_t null_taken = 0;

Review Comment:
   Can we use `int64_t` as for other counts and lengths?



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -74,6 +77,118 @@ class SelectKComparator<SortOrder::Descending> {
   }
 };
 
+struct OutputRangesByNullLikeness {
+  std::span<uint64_t> non_null_like_range;
+  std::span<uint64_t> nan_range;
+  std::span<uint64_t> null_range;
+};
+
+OutputRangesByNullLikeness CalculateOutputRangesByNullLikeness(
+    int64_t non_null_like_count, int64_t nan_count, int64_t null_count,
+    NullPlacement null_placement, std::span<uint64_t> output_indices) {
+  auto k = static_cast<int64_t>(output_indices.size());
+  int64_t non_null_like_to_take = 0;
+  int64_t nan_to_take = 0;
+  int64_t null_to_take = 0;
+  if (null_placement == NullPlacement::AtEnd) {
+    non_null_like_to_take = std::min(k, non_null_like_count);
+    nan_to_take = std::min(k - non_null_like_to_take, nan_count);
+    null_to_take = std::min(k - non_null_like_to_take - nan_to_take, 
null_count);
+    return OutputRangesByNullLikeness{
+        .non_null_like_range = output_indices.subspan(0, 
non_null_like_to_take),
+        .nan_range = output_indices.subspan(non_null_like_to_take, 
nan_to_take),
+        .null_range =
+            output_indices.subspan(non_null_like_to_take + nan_to_take, 
null_to_take)};
+  } else {
+    null_to_take = std::min(k, null_count);
+    nan_to_take = std::min(k - null_to_take, nan_count);
+    non_null_like_to_take = std::min(k - null_to_take - nan_to_take, 
non_null_like_count);
+    return OutputRangesByNullLikeness{
+        .non_null_like_range =
+            output_indices.subspan(null_to_take + nan_to_take, 
non_null_like_to_take),
+        .nan_range = output_indices.subspan(null_to_take, nan_to_take),
+        .null_range = output_indices.subspan(0, null_to_take)};
+  }
+}
+
+template <typename Comparator>
+void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range, 
Comparator cmp,
+                              std::span<uint64_t> output_range) {
+  if (output_range.empty()) {
+    return;
+  }
+  std::span<uint64_t> heap = non_null_input_range.subspan(0, 
output_range.size());
+  std::ranges::make_heap(heap, cmp);
+
+  std::span<uint64_t> remaining_input = 
non_null_input_range.subspan(output_range.size());
+  for (uint64_t x_index : remaining_input) {
+    if (cmp(x_index, heap.front())) {
+      std::ranges::pop_heap(heap, cmp);
+      heap.back() = x_index;
+      std::ranges::push_heap(heap, cmp);
+    }
+  }
+
+  // fill output in reverse when destructing,
+  // as the "worst" (next-to-would-have-been-replaced) element is at heap-top
+  for (auto& reverse_out_iter : std::ranges::reverse_view(output_range)) {
+    reverse_out_iter = heap.front();  // heap-top has the next element
+    std::ranges::pop_heap(heap, cmp);
+    // Decrease heap-size by one
+    heap = heap.first(heap.size() - 1);
+  }
+}
+
+template <typename InType, SortOrder sort_order>
+void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range,
+                              const typename TypeTraits<InType>::ArrayType& 
arr,
+                              std::span<uint64_t> output_range) {
+  using GetView = GetViewType<InType>;
+  SelectKComparator<sort_order> comparator;
+  auto cmp = [&arr, &comparator](uint64_t left, uint64_t right) {
+    const auto lval = GetView::LogicalValue(arr.GetView(left));
+    const auto rval = GetView::LogicalValue(arr.GetView(right));
+    return comparator(lval, rval);
+  };
+  HeapSortNonNullsToOutput(non_null_input_range, cmp, output_range);
+}
+
+template <typename InType>
+void HeapSortNonNullsToOutput(std::span<uint64_t> non_null_input_range,
+                              const typename TypeTraits<InType>::ArrayType& 
arr,
+                              SortOrder order, std::span<uint64_t> 
output_range) {
+  if (order == SortOrder::Ascending) {
+    HeapSortNonNullsToOutput<InType, 
SortOrder::Ascending>(non_null_input_range, arr,
+                                                           output_range);
+  } else {
+    HeapSortNonNullsToOutput<InType, 
SortOrder::Descending>(non_null_input_range, arr,
+                                                            output_range);
+  }
+}
+
+struct PartitionResultByNullLikeness {

Review Comment:
   Not for this PR, but I think it would good to consolidate this into 
`NullPartitionResult` instead of always having to write dedicated code for 
null-like values.



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -167,6 +278,9 @@ struct TypedHeapItem {
 };
 
 class ChunkedArraySelector : public TypeVisitor {
+  using ResolvedSortKey = ResolvedTableSortKey;
+  using Comparator = MultipleKeyComparator<ResolvedSortKey>;

Review Comment:
   Are these appropriate in `ChunkedArraySelector`?



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -192,6 +307,25 @@ class ChunkedArraySelector : public TypeVisitor {
   VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
 #undef VISIT
 
+  template <typename InType>
+  int64_t ComputeNanCount() {
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+    if constexpr (has_null_like_values<typename ArrayType::TypeClass>()) {
+      int64_t nan_count = 0;
+      for (const auto& chunk : physical_chunks_) {
+        auto values = std::make_shared<ArrayType>(chunk->data());
+        int64_t length = values->length();
+        for (int64_t index = 0; index < length; ++index) {
+          if (std::isnan(values->GetView(index))) {

Review Comment:
   We need to skip null values here, as a null value could have a NaN in its 
data slot, even though it's unlikely. You might call `IsValid`, or use the 
faster `VisitArraySpanInline`.



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -205,19 +339,38 @@ class ChunkedArraySelector : public TypeVisitor {
     if (k_ > chunked_array_.length()) {
       k_ = chunked_array_.length();
     }
+
+    ARROW_ASSIGN_OR_RAISE(auto take_indices,
+                          MakeMutableUInt64Array(k_, ctx_->memory_pool()));
+    auto* output_begin = take_indices->GetMutableValues<uint64_t>(1);
+
+    int64_t null_count = chunked_array_.null_count();
+    int64_t nan_count = ComputeNanCount<InType>();
+    int64_t non_null_like_count = chunked_array_.length() - null_count - 
nan_count;
+

Review Comment:
   Instead of having to separately compute the NaN count, can we call 
`PartitionNullsAndNans` on all chunks in advance, and then just sum the 
resulting NaN counts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to