alamb commented on code in PR #7855:
URL: https://github.com/apache/arrow-datafusion/pull/7855#discussion_r1365461718


##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;

Review Comment:
   Could we please document what these functions mean (even though you could 
argue they are obvious -- it might help to explain what is expected if `l_idx > 
len`, for example. 



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
 
-    /// Tracks for the memory used by in the `Rows` of this
-    /// cursor. Freed on drop
-    #[allow(dead_code)]
-    reservation: MemoryReservation,
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
 }
 
-impl std::fmt::Debug for RowCursor {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("SortKeyCursor")
-            .field("cur_row", &self.cur_row)
-            .field("num_rows", &self.num_rows)
-            .finish()
-    }
+/// A comparable cursor, used by sort operations
+#[derive(Debug)]
+pub struct Cursor<T: CursorValues> {
+    offset: usize,
+    values: T,
 }
 
-impl RowCursor {
-    /// Create a new SortKeyCursor from `rows` and a `reservation`
-    /// that tracks its memory. There must be at least one row
-    ///
-    /// Panics if the reservation is not for exactly `rows.size()`
-    /// bytes or if `rows` is empty.
-    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
-        assert_eq!(
-            rows.size(),
-            reservation.size(),
-            "memory reservation mismatch"
-        );
-        assert!(rows.num_rows() > 0);
-        Self {
-            cur_row: 0,
-            num_rows: rows.num_rows(),
-            rows,
-            reservation,
-        }
+impl<T: CursorValues> Cursor<T> {
+    /// Create a [`Cursor`] from the given [`CursorValues`]
+    pub fn new(values: T) -> Self {
+        Self { offset: 0, values }
     }
 
-    /// Returns the current row
-    fn current(&self) -> Row<'_> {
-        self.rows.row(self.cur_row)
+    /// Returns true if there are no more rows in this cursor
+    pub fn is_finished(&self) -> bool {
+        self.offset == self.values.len()
+    }
+
+    /// Advance the cursor, returning the previous row index
+    pub fn advance(&mut self) -> usize {
+        let t = self.offset;
+        self.offset += 1;
+        t
     }
 }
 
-impl PartialEq for RowCursor {
+impl<T: CursorValues> PartialEq for Cursor<T> {
     fn eq(&self, other: &Self) -> bool {
-        self.current() == other.current()
+        T::eq(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-impl Eq for RowCursor {}
+impl<T: CursorValues> Eq for Cursor<T> {}
 
-impl PartialOrd for RowCursor {
+impl<T: CursorValues> PartialOrd for Cursor<T> {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
         Some(self.cmp(other))
     }
 }
 
-impl Ord for RowCursor {
+impl<T: CursorValues> Ord for Cursor<T> {
     fn cmp(&self, other: &Self) -> Ordering {
-        self.current().cmp(&other.current())
+        T::compare(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-/// A cursor into a sorted batch of rows.
-///
-/// Each cursor must have at least one row so `advance` can be called at least
-/// once prior to calling `is_finished`.
-pub trait Cursor: Ord {
-    /// Returns true if there are no more rows in this cursor
-    fn is_finished(&self) -> bool;
+/// Implements [`CursorValues`] for [`Rows`]

Review Comment:
   ```suggestion
   /// Implements [`CursorValues`] for [`Rows`].
   ///
   /// Used for sorting when there are multiple columns in the sort key
   ```



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
 
-    /// Tracks for the memory used by in the `Rows` of this
-    /// cursor. Freed on drop
-    #[allow(dead_code)]
-    reservation: MemoryReservation,
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
 }
 
-impl std::fmt::Debug for RowCursor {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("SortKeyCursor")
-            .field("cur_row", &self.cur_row)
-            .field("num_rows", &self.num_rows)
-            .finish()
-    }
+/// A comparable cursor, used by sort operations
+#[derive(Debug)]
+pub struct Cursor<T: CursorValues> {
+    offset: usize,
+    values: T,
 }
 
-impl RowCursor {
-    /// Create a new SortKeyCursor from `rows` and a `reservation`
-    /// that tracks its memory. There must be at least one row
-    ///
-    /// Panics if the reservation is not for exactly `rows.size()`
-    /// bytes or if `rows` is empty.
-    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
-        assert_eq!(
-            rows.size(),
-            reservation.size(),
-            "memory reservation mismatch"
-        );
-        assert!(rows.num_rows() > 0);
-        Self {
-            cur_row: 0,
-            num_rows: rows.num_rows(),
-            rows,
-            reservation,
-        }
+impl<T: CursorValues> Cursor<T> {
+    /// Create a [`Cursor`] from the given [`CursorValues`]
+    pub fn new(values: T) -> Self {
+        Self { offset: 0, values }
     }
 
-    /// Returns the current row
-    fn current(&self) -> Row<'_> {
-        self.rows.row(self.cur_row)
+    /// Returns true if there are no more rows in this cursor
+    pub fn is_finished(&self) -> bool {
+        self.offset == self.values.len()
+    }
+
+    /// Advance the cursor, returning the previous row index
+    pub fn advance(&mut self) -> usize {
+        let t = self.offset;
+        self.offset += 1;
+        t
     }
 }
 
-impl PartialEq for RowCursor {
+impl<T: CursorValues> PartialEq for Cursor<T> {
     fn eq(&self, other: &Self) -> bool {
-        self.current() == other.current()
+        T::eq(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-impl Eq for RowCursor {}
+impl<T: CursorValues> Eq for Cursor<T> {}
 
-impl PartialOrd for RowCursor {
+impl<T: CursorValues> PartialOrd for Cursor<T> {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
         Some(self.cmp(other))
     }
 }
 
-impl Ord for RowCursor {
+impl<T: CursorValues> Ord for Cursor<T> {
     fn cmp(&self, other: &Self) -> Ordering {
-        self.current().cmp(&other.current())
+        T::compare(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-/// A cursor into a sorted batch of rows.
-///
-/// Each cursor must have at least one row so `advance` can be called at least
-/// once prior to calling `is_finished`.
-pub trait Cursor: Ord {
-    /// Returns true if there are no more rows in this cursor
-    fn is_finished(&self) -> bool;
+/// Implements [`CursorValues`] for [`Rows`]
+#[derive(Debug)]
+pub struct RowValues {
+    rows: Rows,
 
-    /// Advance the cursor, returning the previous row index
-    fn advance(&mut self) -> usize;
+    /// Tracks for the memory used by in the `Rows` of this
+    /// cursor. Freed on drop
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
+}
+
+impl RowValues {
+    /// Create a new [`RowValues`] from `rows` and a `reservation`
+    /// that tracks its memory. There must be at least one row
+    ///
+    /// Panics if the reservation is not for exactly `rows.size()`
+    /// bytes or if `rows` is empty.
+    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
+        assert_eq!(
+            rows.size(),
+            reservation.size(),
+            "memory reservation mismatch"
+        );
+        assert!(rows.num_rows() > 0);
+        Self { rows, reservation }
+    }
 }
 
-impl Cursor for RowCursor {
-    #[inline]
-    fn is_finished(&self) -> bool {
-        self.num_rows == self.cur_row
+impl CursorValues for RowValues {
+    fn len(&self) -> usize {
+        self.rows.num_rows()
     }
 
-    #[inline]
-    fn advance(&mut self) -> usize {
-        let t = self.cur_row;
-        self.cur_row += 1;
-        t
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.rows.row(l_idx) == r.rows.row(r_idx)
+    }
+
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.rows.row(l_idx).cmp(&r.rows.row(r_idx))
     }
 }
 
-/// An [`Array`] that can be converted into [`FieldValues`]
+/// An [`Array`] that can be converted into [`CursorValues`]
 pub trait FieldArray: Array + 'static {
-    type Values: FieldValues;
+    type Values: CursorValues;
 
     fn values(&self) -> Self::Values;

Review Comment:
   given the overload of the use of the term `values` (e.g. for Dictionary 
arrays) perhaps we could rename this to `cursor_values` 



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -149,70 +143,73 @@ impl<T: ArrowPrimitiveType> FieldArray for 
PrimitiveArray<T> {
 #[derive(Debug)]
 pub struct PrimitiveValues<T: ArrowNativeTypeOp>(ScalarBuffer<T>);
 
-impl<T: ArrowNativeTypeOp> FieldValues for PrimitiveValues<T> {
-    type Value = T;
-
+impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
     fn len(&self) -> usize {
         self.0.len()
     }
 
-    #[inline]
-    fn compare(a: &Self::Value, b: &Self::Value) -> Ordering {
-        T::compare(*a, *b)
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.0[l_idx].is_eq(r.0[r_idx])
     }
 
-    #[inline]
-    fn value(&self, idx: usize) -> &Self::Value {
-        &self.0[idx]
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.0[l_idx].compare(r.0[r_idx])
     }
 }
 
-impl<T: ByteArrayType> FieldArray for GenericByteArray<T> {
-    type Values = Self;
+pub struct ByteArrayValues<T: OffsetSizeTrait> {
+    offsets: OffsetBuffer<T>,
+    values: Buffer,
+}
 
-    fn values(&self) -> Self::Values {
-        // Once https://github.com/apache/arrow-rs/pull/4048 is released
-        // Could potentially destructure array into buffers to reduce codegen,
-        // in a similar vein to what is done for PrimitiveArray
-        self.clone()
+impl<T: OffsetSizeTrait> ByteArrayValues<T> {
+    fn value(&self, idx: usize) -> &[u8] {
+        let end = self.offsets[idx + 1].as_usize();
+        let start = self.offsets[idx].as_usize();
+        // Safety: offsets are valid
+        unsafe { self.values.get_unchecked(start..end) }
     }
 }
 
-impl<T: ByteArrayType> FieldValues for GenericByteArray<T> {
-    type Value = T::Native;
-
+impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
     fn len(&self) -> usize {
-        Array::len(self)
+        self.offsets.len() - 1
     }
 
-    #[inline]
-    fn compare(a: &Self::Value, b: &Self::Value) -> Ordering {
-        let a: &[u8] = a.as_ref();
-        let b: &[u8] = b.as_ref();
-        a.cmp(b)
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.value(l_idx) == r.value(r_idx)
     }
 
-    #[inline]
-    fn value(&self, idx: usize) -> &Self::Value {
-        self.value(idx)
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.value(l_idx).cmp(r.value(r_idx))
     }
 }
 
-/// A cursor over sorted, nullable [`FieldValues`]
+impl<T: ByteArrayType> FieldArray for GenericByteArray<T> {
+    type Values = ByteArrayValues<T::Offset>;
+
+    fn values(&self) -> Self::Values {
+        ByteArrayValues {
+            offsets: self.offsets().clone(),
+            values: self.values().clone(),
+        }
+    }
+}
+
+/// A collection of sorted, nullable [`FieldArray`]

Review Comment:
   ```suggestion
   /// A collection of sorted, nullable [`CursorValues`]
   ```



##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -89,7 +89,7 @@ pub(crate) struct SortPreservingMergeStream<C> {
     batch_size: usize,
 
     /// Vector that holds cursors for each non-exhausted input partition

Review Comment:
   ```suggestion
       /// Cursors for each input partition. `None` means the input is exhausted
   ```



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
 
-    /// Tracks for the memory used by in the `Rows` of this
-    /// cursor. Freed on drop
-    #[allow(dead_code)]
-    reservation: MemoryReservation,
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
 }
 
-impl std::fmt::Debug for RowCursor {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("SortKeyCursor")
-            .field("cur_row", &self.cur_row)
-            .field("num_rows", &self.num_rows)
-            .finish()
-    }
+/// A comparable cursor, used by sort operations

Review Comment:
   Given how tricky this code is to begin with, I recommend investing in more 
documentation. Here is a suggestion:
   
   ```suggestion
   /// A comparable cursor, used by sort operations.
   ///
   /// A `Cursor` is a pointer into a collection of rows, stored in
   /// [`CursorValues`] 
   ///
   /// ```text
   ///
   /// ┌───────────────────────┐                                   
   /// │                       │           ┌──────────────────────┐
   /// │ ┌─────────┐ ┌─────┐   │    ─ ─ ─ ─│      Cursor<T>       │
   /// │ │    1    │ │  A  │   │   │       └──────────────────────┘
   /// │ ├─────────┤ ├─────┤   │                                   
   /// │ │    2    │ │  A  │◀─ ┼ ─ ┘          Cursor<T> tracks an  
   /// │ └─────────┘ └─────┘   │                offset within a    
   /// │     ...       ...     │                  CursorValues     
   /// │                       │                                   
   /// │ ┌─────────┐ ┌─────┐   │                                   
   /// │ │    3    │ │  E  │   │                                   
   /// │ └─────────┘ └─────┘   │                                   
   /// │                       │                                   
   /// │     CursorValues      │                                   
   /// └───────────────────────┘                                   
   /// 
   ///                                                             
   /// Store logical rows using                                    
   /// one of several  formats,                                    
   /// with specialized                                            
   /// implementations                                             
   /// depending on the column                                     
   /// types                                                       
   /// ```                                                       
   ```
   
   
   



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -149,70 +143,73 @@ impl<T: ArrowPrimitiveType> FieldArray for 
PrimitiveArray<T> {
 #[derive(Debug)]
 pub struct PrimitiveValues<T: ArrowNativeTypeOp>(ScalarBuffer<T>);
 
-impl<T: ArrowNativeTypeOp> FieldValues for PrimitiveValues<T> {
-    type Value = T;
-
+impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
     fn len(&self) -> usize {
         self.0.len()
     }
 
-    #[inline]
-    fn compare(a: &Self::Value, b: &Self::Value) -> Ordering {
-        T::compare(*a, *b)
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.0[l_idx].is_eq(r.0[r_idx])
     }
 
-    #[inline]
-    fn value(&self, idx: usize) -> &Self::Value {
-        &self.0[idx]
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.0[l_idx].compare(r.0[r_idx])
     }
 }
 
-impl<T: ByteArrayType> FieldArray for GenericByteArray<T> {
-    type Values = Self;
+pub struct ByteArrayValues<T: OffsetSizeTrait> {
+    offsets: OffsetBuffer<T>,
+    values: Buffer,
+}
 
-    fn values(&self) -> Self::Values {
-        // Once https://github.com/apache/arrow-rs/pull/4048 is released
-        // Could potentially destructure array into buffers to reduce codegen,
-        // in a similar vein to what is done for PrimitiveArray
-        self.clone()
+impl<T: OffsetSizeTrait> ByteArrayValues<T> {
+    fn value(&self, idx: usize) -> &[u8] {
+        let end = self.offsets[idx + 1].as_usize();

Review Comment:
   Could we also validate the bounds once and then use unchecked access to the 
offsets?
   
   like 
   ```
   assert (idx < self.offsets().len());
   let end = self.offsets.get_unchecked(idx + 1).as_usize(); 
   etc
   ```



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -149,70 +143,73 @@ impl<T: ArrowPrimitiveType> FieldArray for 
PrimitiveArray<T> {
 #[derive(Debug)]
 pub struct PrimitiveValues<T: ArrowNativeTypeOp>(ScalarBuffer<T>);
 
-impl<T: ArrowNativeTypeOp> FieldValues for PrimitiveValues<T> {
-    type Value = T;
-
+impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
     fn len(&self) -> usize {
         self.0.len()
     }
 
-    #[inline]
-    fn compare(a: &Self::Value, b: &Self::Value) -> Ordering {
-        T::compare(*a, *b)
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.0[l_idx].is_eq(r.0[r_idx])
     }
 
-    #[inline]
-    fn value(&self, idx: usize) -> &Self::Value {
-        &self.0[idx]
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.0[l_idx].compare(r.0[r_idx])
     }
 }
 
-impl<T: ByteArrayType> FieldArray for GenericByteArray<T> {
-    type Values = Self;
+pub struct ByteArrayValues<T: OffsetSizeTrait> {
+    offsets: OffsetBuffer<T>,
+    values: Buffer,
+}
 
-    fn values(&self) -> Self::Values {
-        // Once https://github.com/apache/arrow-rs/pull/4048 is released
-        // Could potentially destructure array into buffers to reduce codegen,
-        // in a similar vein to what is done for PrimitiveArray
-        self.clone()
+impl<T: OffsetSizeTrait> ByteArrayValues<T> {
+    fn value(&self, idx: usize) -> &[u8] {
+        let end = self.offsets[idx + 1].as_usize();
+        let start = self.offsets[idx].as_usize();
+        // Safety: offsets are valid
+        unsafe { self.values.get_unchecked(start..end) }
     }
 }
 
-impl<T: ByteArrayType> FieldValues for GenericByteArray<T> {
-    type Value = T::Native;
-
+impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
     fn len(&self) -> usize {
-        Array::len(self)
+        self.offsets.len() - 1
     }
 
-    #[inline]
-    fn compare(a: &Self::Value, b: &Self::Value) -> Ordering {
-        let a: &[u8] = a.as_ref();
-        let b: &[u8] = b.as_ref();
-        a.cmp(b)
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.value(l_idx) == r.value(r_idx)
     }
 
-    #[inline]
-    fn value(&self, idx: usize) -> &Self::Value {
-        self.value(idx)
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.value(l_idx).cmp(r.value(r_idx))
     }
 }
 
-/// A cursor over sorted, nullable [`FieldValues`]
+impl<T: ByteArrayType> FieldArray for GenericByteArray<T> {
+    type Values = ByteArrayValues<T::Offset>;
+
+    fn values(&self) -> Self::Values {
+        ByteArrayValues {
+            offsets: self.offsets().clone(),
+            values: self.values().clone(),
+        }
+    }
+}
+
+/// A collection of sorted, nullable [`FieldArray`]
 ///
 /// Note: comparing cursors with different `SortOptions` will yield an 
arbitrary ordering
 #[derive(Debug)]
-pub struct FieldCursor<T: FieldValues> {
+pub struct ArrayValues<T: CursorValues> {

Review Comment:
   This is a nice naming change



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]

Review Comment:
   ```suggestion
   /// A comparable collection of values for use with [`Cursor`].
   ///
   /// This is a trait as there are several specialized implementations, such 
as for single columns
   /// or for normalized multi column keys ([`Rows`])
   ```



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
 
-    /// Tracks for the memory used by in the `Rows` of this
-    /// cursor. Freed on drop
-    #[allow(dead_code)]
-    reservation: MemoryReservation,
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
 }
 
-impl std::fmt::Debug for RowCursor {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("SortKeyCursor")
-            .field("cur_row", &self.cur_row)
-            .field("num_rows", &self.num_rows)
-            .finish()
-    }
+/// A comparable cursor, used by sort operations
+#[derive(Debug)]
+pub struct Cursor<T: CursorValues> {
+    offset: usize,
+    values: T,
 }
 
-impl RowCursor {
-    /// Create a new SortKeyCursor from `rows` and a `reservation`
-    /// that tracks its memory. There must be at least one row
-    ///
-    /// Panics if the reservation is not for exactly `rows.size()`
-    /// bytes or if `rows` is empty.
-    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
-        assert_eq!(
-            rows.size(),
-            reservation.size(),
-            "memory reservation mismatch"
-        );
-        assert!(rows.num_rows() > 0);
-        Self {
-            cur_row: 0,
-            num_rows: rows.num_rows(),
-            rows,
-            reservation,
-        }
+impl<T: CursorValues> Cursor<T> {
+    /// Create a [`Cursor`] from the given [`CursorValues`]
+    pub fn new(values: T) -> Self {
+        Self { offset: 0, values }
     }
 
-    /// Returns the current row
-    fn current(&self) -> Row<'_> {
-        self.rows.row(self.cur_row)
+    /// Returns true if there are no more rows in this cursor
+    pub fn is_finished(&self) -> bool {
+        self.offset == self.values.len()
+    }
+
+    /// Advance the cursor, returning the previous row index
+    pub fn advance(&mut self) -> usize {
+        let t = self.offset;
+        self.offset += 1;
+        t
     }
 }
 
-impl PartialEq for RowCursor {
+impl<T: CursorValues> PartialEq for Cursor<T> {
     fn eq(&self, other: &Self) -> bool {
-        self.current() == other.current()
+        T::eq(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-impl Eq for RowCursor {}
+impl<T: CursorValues> Eq for Cursor<T> {}
 
-impl PartialOrd for RowCursor {
+impl<T: CursorValues> PartialOrd for Cursor<T> {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
         Some(self.cmp(other))
     }
 }
 
-impl Ord for RowCursor {
+impl<T: CursorValues> Ord for Cursor<T> {
     fn cmp(&self, other: &Self) -> Ordering {
-        self.current().cmp(&other.current())
+        T::compare(&self.values, self.offset, &other.values, other.offset)
     }
 }
 
-/// A cursor into a sorted batch of rows.
-///
-/// Each cursor must have at least one row so `advance` can be called at least
-/// once prior to calling `is_finished`.
-pub trait Cursor: Ord {
-    /// Returns true if there are no more rows in this cursor
-    fn is_finished(&self) -> bool;
+/// Implements [`CursorValues`] for [`Rows`]
+#[derive(Debug)]
+pub struct RowValues {
+    rows: Rows,
 
-    /// Advance the cursor, returning the previous row index
-    fn advance(&mut self) -> usize;
+    /// Tracks for the memory used by in the `Rows` of this
+    /// cursor. Freed on drop
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
+}
+
+impl RowValues {
+    /// Create a new [`RowValues`] from `rows` and a `reservation`
+    /// that tracks its memory. There must be at least one row
+    ///
+    /// Panics if the reservation is not for exactly `rows.size()`
+    /// bytes or if `rows` is empty.
+    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
+        assert_eq!(
+            rows.size(),
+            reservation.size(),
+            "memory reservation mismatch"
+        );
+        assert!(rows.num_rows() > 0);
+        Self { rows, reservation }
+    }
 }
 
-impl Cursor for RowCursor {
-    #[inline]
-    fn is_finished(&self) -> bool {
-        self.num_rows == self.cur_row
+impl CursorValues for RowValues {
+    fn len(&self) -> usize {
+        self.rows.num_rows()
     }
 
-    #[inline]
-    fn advance(&mut self) -> usize {
-        let t = self.cur_row;
-        self.cur_row += 1;
-        t
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        l.rows.row(l_idx) == r.rows.row(r_idx)
+    }
+
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        l.rows.row(l_idx).cmp(&r.rows.row(r_idx))
     }
 }
 
-/// An [`Array`] that can be converted into [`FieldValues`]
+/// An [`Array`] that can be converted into [`CursorValues`]
 pub trait FieldArray: Array + 'static {

Review Comment:
   I found the use of `Field` confusing here -- what about renaming this  
`CursorArray` or `IntoCursorValues`?



##########
datafusion/physical-plan/src/sorts/cursor.rs:
##########
@@ -20,124 +20,118 @@ use std::cmp::Ordering;
 use arrow::buffer::ScalarBuffer;
 use arrow::compute::SortOptions;
 use arrow::datatypes::ArrowNativeTypeOp;
-use arrow::row::{Row, Rows};
+use arrow::row::Rows;
 use arrow_array::types::ByteArrayType;
-use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use arrow_array::{
+    Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow_buffer::{Buffer, OffsetBuffer};
 use datafusion_execution::memory_pool::MemoryReservation;
 
-/// A [`Cursor`] for [`Rows`]
-pub struct RowCursor {
-    cur_row: usize,
-    num_rows: usize,
+/// A comparable collection of values for use with [`Cursor`]
+pub trait CursorValues {
+    fn len(&self) -> usize;
 
-    rows: Rows,
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
 
-    /// Tracks for the memory used by in the `Rows` of this
-    /// cursor. Freed on drop
-    #[allow(dead_code)]
-    reservation: MemoryReservation,
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
 }
 
-impl std::fmt::Debug for RowCursor {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("SortKeyCursor")
-            .field("cur_row", &self.cur_row)
-            .field("num_rows", &self.num_rows)
-            .finish()
-    }
+/// A comparable cursor, used by sort operations

Review Comment:
   Given how tricky this code is to begin with, I recommend investing in more 
documentation. Here is a suggestion:
   
   ```suggestion
   /// A comparable cursor, used by sort operations.
   ///
   /// A `Cursor` is a pointer into a collection of rows, stored in
   /// [`CursorValues`] 
   ///
   /// ```text
   ///
   /// ┌───────────────────────┐                                   
   /// │                       │           ┌──────────────────────┐
   /// │ ┌─────────┐ ┌─────┐   │    ─ ─ ─ ─│      Cursor<T>       │
   /// │ │    1    │ │  A  │   │   │       └──────────────────────┘
   /// │ ├─────────┤ ├─────┤   │                                   
   /// │ │    2    │ │  A  │◀─ ┼ ─ ┘          Cursor<T> tracks an  
   /// │ └─────────┘ └─────┘   │                offset within a    
   /// │     ...       ...     │                  CursorValues     
   /// │                       │                                   
   /// │ ┌─────────┐ ┌─────┐   │                                   
   /// │ │    3    │ │  E  │   │                                   
   /// │ └─────────┘ └─────┘   │                                   
   /// │                       │                                   
   /// │     CursorValues      │                                   
   /// └───────────────────────┘                                   
   /// 
   ///                                                             
   /// Store logical rows using                                    
   /// one of several  formats,                                    
   /// with specialized                                            
   /// implementations                                             
   /// depending on the column                                     
   /// types                                                       
   /// ```                                                       
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to