This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7aa6b3659b Minor: Assert `streaming_merge` has non empty sort exprs
(#7795)
7aa6b3659b is described below
commit 7aa6b3659b33510203778c3978f0830c1b3f7c2c
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 16 11:18:08 2023 -0400
Minor: Assert `streaming_merge` has non empty sort exprs (#7795)
* Minor: Assert streaming_merge has non empty sort exprs
* clippy
* Add test
* Apply suggestions from code review
Co-authored-by: jakevin <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---------
Co-authored-by: jakevin <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
datafusion/physical-plan/src/sorts/cursor.rs | 18 +++++++++++++-----
.../src/sorts/sort_preserving_merge.rs | 21 ++++++++++++++++++++-
.../physical-plan/src/sorts/streaming_merge.rs | 7 ++++++-
3 files changed, 39 insertions(+), 7 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/cursor.rs
b/datafusion/physical-plan/src/sorts/cursor.rs
index baa417649f..52de880bae 100644
--- a/datafusion/physical-plan/src/sorts/cursor.rs
+++ b/datafusion/physical-plan/src/sorts/cursor.rs
@@ -48,16 +48,17 @@ impl std::fmt::Debug for RowCursor {
impl RowCursor {
/// Create a new SortKeyCursor from `rows` and a `reservation`
- /// that tracks its memory.
+ /// that tracks its memory. There must be at least one row
///
- /// Panic's if the reservation is not for exactly `rows.size()`
- /// bytes
+ /// Panics if the reservation is not for exactly `rows.size()`
+ /// bytes or if `rows` is empty.
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
assert_eq!(
rows.size(),
reservation.size(),
"memory reservation mismatch"
);
+ assert!(rows.num_rows() > 0);
Self {
cur_row: 0,
num_rows: rows.num_rows(),
@@ -92,7 +93,10 @@ impl Ord for RowCursor {
}
}
-/// A cursor into a sorted batch of rows
+/// A cursor into a sorted batch of rows.
+///
+/// Each cursor must have at least one row so `advance` can be called at least
+/// once prior to calling `is_finished`.
pub trait Cursor: Ord {
/// Returns true if there are no more rows in this cursor
fn is_finished(&self) -> bool;
@@ -207,8 +211,12 @@ pub struct FieldCursor<T: FieldValues> {
}
impl<T: FieldValues> FieldCursor<T> {
- /// Create a new [`FieldCursor`] from the provided `values` sorted
according to `options`
+ /// Create a new [`FieldCursor`] from the provided `values` sorted
according
+ /// to `options`.
+ ///
+ /// Panics if the array is empty
pub fn new<A: FieldArray<Values = T>>(options: SortOptions, array: &A) ->
Self {
+ assert!(array.len() > 0, "Empty array passed to FieldCursor");
let null_threshold = match options.nulls_first {
true => array.null_count(),
false => array.len() - array.null_count(),
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 597b59f776..f7a9e193ee 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -290,7 +290,7 @@ mod tests {
use crate::test::{self, assert_is_pending, make_partition};
use crate::{collect, common};
use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
- use datafusion_common::assert_batches_eq;
+ use datafusion_common::{assert_batches_eq, assert_contains};
use super::*;
@@ -342,6 +342,25 @@ mod tests {
.await;
}
+ #[tokio::test]
+ async fn test_merge_no_exprs() {
+ let task_ctx = Arc::new(TaskContext::default());
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+ let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+ let schema = batch.schema();
+ let sort = vec![]; // no sort expressions
+ let exec = MemoryExec::try_new(&[vec![batch.clone()], vec![batch]],
schema, None)
+ .unwrap();
+ let merge = Arc::new(SortPreservingMergeExec::new(sort,
Arc::new(exec)));
+
+ let res = collect(merge, task_ctx).await.unwrap_err();
+ assert_contains!(
+ res.to_string(),
+ "Internal error: Sort expressions cannot be empty for streaming
merge"
+ );
+ }
+
#[tokio::test]
async fn test_merge_some_overlap() {
let task_ctx = Arc::new(TaskContext::default());
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index 96d180027e..4f8d806385 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -26,7 +26,7 @@ use crate::sorts::{
use crate::{PhysicalSortExpr, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
-use datafusion_common::Result;
+use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
macro_rules! primitive_merge_helper {
@@ -60,6 +60,11 @@ pub fn streaming_merge(
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
+ // If there are no sort expressions, preserving the order
+ // doesn't mean anything (and result in infinite loops)
+ if expressions.is_empty() {
+ return internal_err!("Sort expressions cannot be empty for streaming
merge");
+ }
// Special case single column comparisons with optimized cursor
implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();