alamb commented on code in PR #5897:
URL: https://github.com/apache/arrow-datafusion/pull/5897#discussion_r1161935087


##########
datafusion/core/src/physical_plan/sorts/merge.rs:
##########
@@ -37,8 +52,16 @@ pub(crate) fn streaming_merge(
     tracking_metrics: MemTrackingMetrics,
     batch_size: usize,
 ) -> Result<SendableRecordBatchStream> {
-    let streams = RowCursorStream::try_new(schema.as_ref(), expressions, 
streams)?;
+    if expressions.len() == 1 {

Review Comment:
   I think it is worth a comment here explaining the rationale for this for 
future readers. Something like
   
   ```suggestion
       // special case single column primitives to avoid overhead of runtime 
comparators
       if expressions.len() == 1 {
   ```



##########
datafusion/core/src/physical_plan/sorts/cursor.rs:
##########
@@ -93,3 +96,232 @@ impl Cursor for RowCursor {
         t
     }
 }
+
+/// A cursor over sorted, nullable [`ArrowNativeTypeOp`]
+///
+/// Note: comparing cursors with different `SortOptions` will yield an 
arbitrary ordering
+#[derive(Debug)]
+pub struct PrimitiveCursor<T: ArrowNativeTypeOp> {
+    values: ScalarBuffer<T>,
+    offset: usize,
+    // If nulls first, the first non-null index
+    // Otherwise, the first null index
+    null_threshold: usize,
+    options: SortOptions,
+}
+
+impl<T: ArrowNativeTypeOp> PrimitiveCursor<T> {
+    /// Create a new [`PrimitiveCursor`] from the provided `values` sorted 
according to `options`
+    pub fn new(options: SortOptions, values: ScalarBuffer<T>, null_count: 
usize) -> Self {
+        assert!(null_count <= values.len());
+
+        let null_threshold = match options.nulls_first {
+            true => null_count,
+            false => values.len() - null_count,
+        };
+
+        Self {
+            values,
+            offset: 0,
+            null_threshold,
+            options,
+        }
+    }
+
+    fn is_null(&self) -> bool {
+        (self.offset < self.null_threshold) == self.options.nulls_first
+    }
+
+    fn value(&self) -> T {
+        self.values[self.offset]
+    }
+}
+
+impl<T: ArrowNativeTypeOp> PartialEq for PrimitiveCursor<T> {
+    fn eq(&self, other: &Self) -> bool {
+        self.cmp(other).is_eq()
+    }
+}
+
+impl<T: ArrowNativeTypeOp> Eq for PrimitiveCursor<T> {}
+impl<T: ArrowNativeTypeOp> PartialOrd for PrimitiveCursor<T> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl<T: ArrowNativeTypeOp> Ord for PrimitiveCursor<T> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match (self.is_null(), other.is_null()) {
+            (true, true) => Ordering::Equal,
+            (true, false) => match self.options.nulls_first {
+                true => Ordering::Less,
+                false => Ordering::Greater,
+            },
+            (false, true) => match self.options.nulls_first {
+                true => Ordering::Greater,
+                false => Ordering::Less,
+            },
+            (false, false) => {
+                let s_v = self.value();
+                let o_v = other.value();
+
+                match self.options.descending {

Review Comment:
   I wonder if it would be worth specializing on consts `NULLS_FIRST` and 
`ASC/DESC` as well to avoid what looks like runtime overhead in the hot path.
   
   Maybe it doesn't make any practical difference



##########
datafusion/core/src/physical_plan/sorts/stream.rs:
##########
@@ -139,3 +142,67 @@ impl PartitionedStream for RowCursorStream {
         }))
     }
 }
+
+pub struct PrimitiveCursorStream<T: ArrowPrimitiveType> {

Review Comment:
   ```suggestion
   /// Specialized stream for sorts on single primitive columns
   pub struct PrimitiveCursorStream<T: ArrowPrimitiveType> {
   ```



##########
datafusion/core/src/physical_plan/sorts/cursor.rs:
##########
@@ -93,3 +96,232 @@ impl Cursor for RowCursor {
         t
     }
 }
+
+/// A cursor over sorted, nullable [`ArrowNativeTypeOp`]
+///
+/// Note: comparing cursors with different `SortOptions` will yield an 
arbitrary ordering
+#[derive(Debug)]
+pub struct PrimitiveCursor<T: ArrowNativeTypeOp> {
+    values: ScalarBuffer<T>,
+    offset: usize,
+    // If nulls first, the first non-null index
+    // Otherwise, the first null index
+    null_threshold: usize,

Review Comment:
   Is there any reason it is called `null_threshold` rather than `null_index` 
given it is a null index 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to