This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7aa6b3659b Minor: Assert `streaming_merge` has non empty sort exprs 
(#7795)
7aa6b3659b is described below

commit 7aa6b3659b33510203778c3978f0830c1b3f7c2c
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 16 11:18:08 2023 -0400

    Minor: Assert `streaming_merge` has non empty sort exprs (#7795)
    
    * Minor: Assert streaming_merge has non empty sort exprs
    
    * clippy
    
    * Add test
    
    * Apply suggestions from code review
    
    Co-authored-by: jakevin <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    ---------
    
    Co-authored-by: jakevin <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 datafusion/physical-plan/src/sorts/cursor.rs        | 18 +++++++++++++-----
 .../src/sorts/sort_preserving_merge.rs              | 21 ++++++++++++++++++++-
 .../physical-plan/src/sorts/streaming_merge.rs      |  7 ++++++-
 3 files changed, 39 insertions(+), 7 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/cursor.rs 
b/datafusion/physical-plan/src/sorts/cursor.rs
index baa417649f..52de880bae 100644
--- a/datafusion/physical-plan/src/sorts/cursor.rs
+++ b/datafusion/physical-plan/src/sorts/cursor.rs
@@ -48,16 +48,17 @@ impl std::fmt::Debug for RowCursor {
 
 impl RowCursor {
     /// Create a new SortKeyCursor from `rows` and a `reservation`
-    /// that tracks its memory.
+    /// that tracks its memory. There must be at least one row
     ///
-    /// Panic's if the reservation is not for exactly `rows.size()`
-    /// bytes
+    /// Panics if the reservation is not for exactly `rows.size()`
+    /// bytes or if `rows` is empty.
     pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
         assert_eq!(
             rows.size(),
             reservation.size(),
             "memory reservation mismatch"
         );
+        assert!(rows.num_rows() > 0);
         Self {
             cur_row: 0,
             num_rows: rows.num_rows(),
@@ -92,7 +93,10 @@ impl Ord for RowCursor {
     }
 }
 
-/// A cursor into a sorted batch of rows
+/// A cursor into a sorted batch of rows.
+///
+/// Each cursor must have at least one row so `advance` can be called at least
+/// once prior to calling `is_finished`.
 pub trait Cursor: Ord {
     /// Returns true if there are no more rows in this cursor
     fn is_finished(&self) -> bool;
@@ -207,8 +211,12 @@ pub struct FieldCursor<T: FieldValues> {
 }
 
 impl<T: FieldValues> FieldCursor<T> {
-    /// Create a new [`FieldCursor`] from the provided `values` sorted 
according to `options`
+    /// Create a new [`FieldCursor`] from the provided `values` sorted 
according
+    /// to `options`.
+    ///
+    /// Panics if the array is empty
     pub fn new<A: FieldArray<Values = T>>(options: SortOptions, array: &A) -> 
Self {
+        assert!(array.len() > 0, "Empty array passed to FieldCursor");
         let null_threshold = match options.nulls_first {
             true => array.null_count(),
             false => array.len() - array.null_count(),
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 597b59f776..f7a9e193ee 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -290,7 +290,7 @@ mod tests {
     use crate::test::{self, assert_is_pending, make_partition};
     use crate::{collect, common};
     use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
-    use datafusion_common::assert_batches_eq;
+    use datafusion_common::{assert_batches_eq, assert_contains};
 
     use super::*;
 
@@ -342,6 +342,25 @@ mod tests {
         .await;
     }
 
+    #[tokio::test]
+    async fn test_merge_no_exprs() {
+        let task_ctx = Arc::new(TaskContext::default());
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let schema = batch.schema();
+        let sort = vec![]; // no sort expressions
+        let exec = MemoryExec::try_new(&[vec![batch.clone()], vec![batch]], 
schema, None)
+            .unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec)));
+
+        let res = collect(merge, task_ctx).await.unwrap_err();
+        assert_contains!(
+            res.to_string(),
+            "Internal error: Sort expressions cannot be empty for streaming 
merge"
+        );
+    }
+
     #[tokio::test]
     async fn test_merge_some_overlap() {
         let task_ctx = Arc::new(TaskContext::default());
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs 
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index 96d180027e..4f8d806385 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -26,7 +26,7 @@ use crate::sorts::{
 use crate::{PhysicalSortExpr, SendableRecordBatchStream};
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow_array::*;
-use datafusion_common::Result;
+use datafusion_common::{internal_err, DataFusionError, Result};
 use datafusion_execution::memory_pool::MemoryReservation;
 
 macro_rules! primitive_merge_helper {
@@ -60,6 +60,11 @@ pub fn streaming_merge(
     fetch: Option<usize>,
     reservation: MemoryReservation,
 ) -> Result<SendableRecordBatchStream> {
+    // If there are no sort expressions, preserving the order
+    // doesn't mean anything (and result in infinite loops)
+    if expressions.is_empty() {
+        return internal_err!("Sort expressions cannot be empty for streaming 
merge");
+    }
     // Special case single column comparisons with optimized cursor 
implementations
     if expressions.len() == 1 {
         let sort = expressions[0].clone();

Reply via email to