This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a0a063d235 Perf: Support Utf8View datatype single column comparisons 
for SortPreservingMergeStream (#15348)
a0a063d235 is described below

commit a0a063d23599ff78fa2bd2a732dc74b862e8ad7d
Author: Qi Zhu <[email protected]>
AuthorDate: Tue Mar 25 23:44:14 2025 +0800

    Perf: Support Utf8View datatype single column comparisons for 
SortPreservingMergeStream (#15348)
    
    * Perf: Support Utf8View datatype single column comparisons for 
SortPreservingMergeStream
    
    * Add safety and bench sql
    
    * fix
    
    * Fix
    
    * Add benchmark testing
---
 benchmarks/src/sort_tpch.rs                        |   8 +-
 datafusion/core/benches/sort.rs                    | 111 ++++++++++++++++++++-
 datafusion/physical-plan/src/sorts/cursor.rs       |  57 ++++++++++-
 .../physical-plan/src/sorts/streaming_merge.rs     |   1 +
 4 files changed, 172 insertions(+), 5 deletions(-)

diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index b1997b40e0..956bb92b6c 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -92,7 +92,7 @@ impl RunOpt {
     /// Payload Columns:
     /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column)
     /// - Wide variant: all columns except for possible key columns (12 
columns)
-    const SORT_QUERIES: [&'static str; 10] = [
+    const SORT_QUERIES: [&'static str; 11] = [
         // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column
         r#"
         SELECT l_linenumber, l_partkey
@@ -159,6 +159,12 @@ impl RunOpt {
         FROM lineitem
         ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
         "#,
+        // Q11: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload 
column
+        r#"
+        SELECT l_shipmode, l_comment, l_partkey
+        FROM lineitem
+        ORDER BY l_shipmode;
+        "#,
     ];
 
     /// If query is specified from command line, run only that query.
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 8f0b3753f6..85f456ce5d 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -68,13 +68,13 @@
 
 use std::sync::Arc;
 
+use arrow::array::StringViewArray;
 use arrow::{
     array::{DictionaryArray, Float64Array, Int64Array, StringArray},
     compute::SortOptions,
     datatypes::{Int32Type, Schema},
     record_batch::RecordBatch,
 };
-
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::{
     execution::context::TaskContext,
@@ -114,11 +114,24 @@ fn criterion_benchmark(c: &mut Criterion) {
         ("f64", &f64_streams),
         ("utf8 low cardinality", &utf8_low_cardinality_streams),
         ("utf8 high cardinality", &utf8_high_cardinality_streams),
+        (
+            "utf8 view low cardinality",
+            &utf8_view_low_cardinality_streams,
+        ),
+        (
+            "utf8 view high cardinality",
+            &utf8_view_high_cardinality_streams,
+        ),
         ("utf8 tuple", &utf8_tuple_streams),
+        ("utf8 view tuple", &utf8_view_tuple_streams),
         ("utf8 dictionary", &dictionary_streams),
         ("utf8 dictionary tuple", &dictionary_tuple_streams),
         ("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
         ("mixed tuple", &mixed_tuple_streams),
+        (
+            "mixed tuple with utf8 view",
+            &mixed_tuple_with_utf8_view_streams,
+        ),
     ];
 
     for (name, f) in cases {
@@ -308,6 +321,30 @@ fn utf8_low_cardinality_streams(sorted: bool) -> 
PartitionedBatches {
     })
 }
 
+/// Create streams of random low cardinality utf8_view values
+fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
+    let mut values = DataGenerator::new().utf8_low_cardinality_values();
+    if sorted {
+        values.sort_unstable();
+    }
+    split_tuples(values, |v| {
+        let array: StringViewArray = v.into_iter().collect();
+        RecordBatch::try_from_iter(vec![("utf_view_low", Arc::new(array) as 
_)]).unwrap()
+    })
+}
+
+/// Create streams of high  cardinality (~ no duplicates) utf8_view values
+fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
+    let mut values = DataGenerator::new().utf8_high_cardinality_values();
+    if sorted {
+        values.sort_unstable();
+    }
+    split_tuples(values, |v| {
+        let array: StringViewArray = v.into_iter().collect();
+        RecordBatch::try_from_iter(vec![("utf_view_high", Arc::new(array) as 
_)]).unwrap()
+    })
+}
+
 /// Create streams of high  cardinality (~ no duplicates) utf8 values
 fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
     let mut values = DataGenerator::new().utf8_high_cardinality_values();
@@ -353,6 +390,39 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
     })
 }
 
+/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
+fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
+    let mut gen = DataGenerator::new();
+
+    // need to sort by the combined key, so combine them together
+    let mut tuples: Vec<_> = gen
+        .utf8_low_cardinality_values()
+        .into_iter()
+        .zip(gen.utf8_low_cardinality_values())
+        .zip(gen.utf8_high_cardinality_values())
+        .collect();
+
+    if sorted {
+        tuples.sort_unstable();
+    }
+
+    split_tuples(tuples, |tuples| {
+        let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
+        let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = 
tuples.into_iter().unzip();
+
+        let utf8_view_high: StringViewArray = utf8_high.into_iter().collect();
+        let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
+        let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
+
+        RecordBatch::try_from_iter(vec![
+            ("utf_view_low1", Arc::new(utf8_view_low1) as _),
+            ("utf_view_low2", Arc::new(utf8_view_low2) as _),
+            ("utf_view_high", Arc::new(utf8_view_high) as _),
+        ])
+        .unwrap()
+    })
+}
+
 /// Create a batch of (f64, utf8_low, utf8_low, i64)
 fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
     let mut gen = DataGenerator::new();
@@ -391,6 +461,44 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches 
{
     })
 }
 
+/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
+fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
+    let mut gen = DataGenerator::new();
+
+    // need to sort by the combined key, so combine them together
+    let mut tuples: Vec<_> = gen
+        .i64_values()
+        .into_iter()
+        .zip(gen.utf8_low_cardinality_values())
+        .zip(gen.utf8_low_cardinality_values())
+        .zip(gen.i64_values())
+        .collect();
+
+    if sorted {
+        tuples.sort_unstable();
+    }
+
+    split_tuples(tuples, |tuples| {
+        let (tuples, i64_values): (Vec<_>, Vec<_>) = 
tuples.into_iter().unzip();
+        let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
+        let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = 
tuples.into_iter().unzip();
+
+        let f64_values: Float64Array = f64_values.into_iter().map(|v| v as 
f64).collect();
+
+        let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
+        let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
+        let i64_values: Int64Array = i64_values.into_iter().collect();
+
+        RecordBatch::try_from_iter(vec![
+            ("f64", Arc::new(f64_values) as _),
+            ("utf_view_low1", Arc::new(utf8_view_low1) as _),
+            ("utf_view_low2", Arc::new(utf8_view_low2) as _),
+            ("i64", Arc::new(i64_values) as _),
+        ])
+        .unwrap()
+    })
+}
+
 /// Create a batch of (utf8_dict)
 fn dictionary_streams(sorted: bool) -> PartitionedBatches {
     let mut gen = DataGenerator::new();
@@ -402,7 +510,6 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {
     split_tuples(values, |v| {
         let dictionary: DictionaryArray<Int32Type> =
             v.iter().map(Option::as_deref).collect();
-
         RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as 
_)]).unwrap()
     })
 }
diff --git a/datafusion/physical-plan/src/sorts/cursor.rs 
b/datafusion/physical-plan/src/sorts/cursor.rs
index 8ea7c43d26..3d3bd81948 100644
--- a/datafusion/physical-plan/src/sorts/cursor.rs
+++ b/datafusion/physical-plan/src/sorts/cursor.rs
@@ -18,8 +18,8 @@
 use std::cmp::Ordering;
 
 use arrow::array::{
-    types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, 
OffsetSizeTrait,
-    PrimitiveArray,
+    types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray,
+    GenericByteViewArray, OffsetSizeTrait, PrimitiveArray, StringViewArray,
 };
 use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
 use arrow::compute::SortOptions;
@@ -281,6 +281,59 @@ impl<T: ByteArrayType> CursorArray for GenericByteArray<T> 
{
     }
 }
 
+impl CursorArray for StringViewArray {
+    type Values = StringViewArray;
+    fn values(&self) -> Self {
+        self.clone()
+    }
+}
+
+impl CursorValues for StringViewArray {
+    fn len(&self) -> usize {
+        self.views().len()
+    }
+
+    fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+        // SAFETY: Both l_idx and r_idx are guaranteed to be within bounds,
+        // and any null-checks are handled in the outer layers.
+        // Fast path: Compare the lengths before full byte comparison.
+
+        let l_view = unsafe { l.views().get_unchecked(l_idx) };
+        let l_len = *l_view as u32;
+        let r_view = unsafe { r.views().get_unchecked(r_idx) };
+        let r_len = *r_view as u32;
+        if l_len != r_len {
+            return false;
+        }
+
+        unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, 
r_idx).is_eq() }
+    }
+
+    fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
+        // SAFETY: The caller guarantees that idx > 0 and the indices are 
valid.
+        // Already checked it in is_eq_to_prev_one function
+        // Fast path: Compare the lengths of the current and previous views.
+        let l_view = unsafe { cursor.views().get_unchecked(idx) };
+        let l_len = *l_view as u32;
+        let r_view = unsafe { cursor.views().get_unchecked(idx - 1) };
+        let r_len = *r_view as u32;
+        if l_len != r_len {
+            return false;
+        }
+
+        unsafe {
+            GenericByteViewArray::compare_unchecked(cursor, idx, cursor, idx - 
1).is_eq()
+        }
+    }
+
+    fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+        // SAFETY: Prior assertions guarantee that l_idx and r_idx are valid 
indices.
+        // Null-checks are assumed to have been handled in the wrapper (e.g., 
ArrayValues).
+        // And the bound is checked in is_finished, it is safe to call 
get_unchecked
+        unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx) }
+    }
+}
+
 /// A collection of sorted, nullable [`CursorValues`]
 ///
 /// Note: comparing cursors with different `SortOptions` will yield an 
arbitrary ordering
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs 
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index a541f79dc7..3f022ec609 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -177,6 +177,7 @@ impl<'a> StreamingMergeBuilder<'a> {
             downcast_primitive! {
                 data_type => (primitive_merge_helper, sort, streams, schema, 
metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
                 DataType::Utf8 => merge_helper!(StringArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
+                DataType::Utf8View => merge_helper!(StringViewArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation, 
enable_round_robin_tie_breaker)
                 DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation, 
enable_round_robin_tie_breaker)
                 DataType::Binary => merge_helper!(BinaryArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
                 DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation, 
enable_round_robin_tie_breaker)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to