This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a0a063d235 Perf: Support Utf8View datatype single column comparisons
for SortPreservingMergeStream (#15348)
a0a063d235 is described below
commit a0a063d23599ff78fa2bd2a732dc74b862e8ad7d
Author: Qi Zhu <[email protected]>
AuthorDate: Tue Mar 25 23:44:14 2025 +0800
Perf: Support Utf8View datatype single column comparisons for
SortPreservingMergeStream (#15348)
* Perf: Support Utf8View datatype single column comparisons for
SortPreservingMergeStream
* Add safety and bench sql
* fix
* Fix
* Add benchmark testing
---
benchmarks/src/sort_tpch.rs | 8 +-
datafusion/core/benches/sort.rs | 111 ++++++++++++++++++++-
datafusion/physical-plan/src/sorts/cursor.rs | 57 ++++++++++-
.../physical-plan/src/sorts/streaming_merge.rs | 1 +
4 files changed, 172 insertions(+), 5 deletions(-)
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index b1997b40e0..956bb92b6c 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -92,7 +92,7 @@ impl RunOpt {
/// Payload Columns:
/// - Thin variant: `l_partkey` column with `BIGINT` type (1 column)
/// - Wide variant: all columns except for possible key columns (12
columns)
- const SORT_QUERIES: [&'static str; 10] = [
+ const SORT_QUERIES: [&'static str; 11] = [
// Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column
r#"
SELECT l_linenumber, l_partkey
@@ -159,6 +159,12 @@ impl RunOpt {
FROM lineitem
ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
"#,
+ // Q11: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload
column
+ r#"
+ SELECT l_shipmode, l_comment, l_partkey
+ FROM lineitem
+ ORDER BY l_shipmode;
+ "#,
];
/// If query is specified from command line, run only that query.
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 8f0b3753f6..85f456ce5d 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -68,13 +68,13 @@
use std::sync::Arc;
+use arrow::array::StringViewArray;
use arrow::{
array::{DictionaryArray, Float64Array, Int64Array, StringArray},
compute::SortOptions,
datatypes::{Int32Type, Schema},
record_batch::RecordBatch,
};
-
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
execution::context::TaskContext,
@@ -114,11 +114,24 @@ fn criterion_benchmark(c: &mut Criterion) {
("f64", &f64_streams),
("utf8 low cardinality", &utf8_low_cardinality_streams),
("utf8 high cardinality", &utf8_high_cardinality_streams),
+ (
+ "utf8 view low cardinality",
+ &utf8_view_low_cardinality_streams,
+ ),
+ (
+ "utf8 view high cardinality",
+ &utf8_view_high_cardinality_streams,
+ ),
("utf8 tuple", &utf8_tuple_streams),
+ ("utf8 view tuple", &utf8_view_tuple_streams),
("utf8 dictionary", &dictionary_streams),
("utf8 dictionary tuple", &dictionary_tuple_streams),
("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
("mixed tuple", &mixed_tuple_streams),
+ (
+ "mixed tuple with utf8 view",
+ &mixed_tuple_with_utf8_view_streams,
+ ),
];
for (name, f) in cases {
@@ -308,6 +321,30 @@ fn utf8_low_cardinality_streams(sorted: bool) ->
PartitionedBatches {
})
}
+/// Create streams of random low cardinality utf8_view values
+fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
+ let mut values = DataGenerator::new().utf8_low_cardinality_values();
+ if sorted {
+ values.sort_unstable();
+ }
+ split_tuples(values, |v| {
+ let array: StringViewArray = v.into_iter().collect();
+ RecordBatch::try_from_iter(vec![("utf_view_low", Arc::new(array) as
_)]).unwrap()
+ })
+}
+
+/// Create streams of high cardinality (~ no duplicates) utf8_view values
+fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
+ let mut values = DataGenerator::new().utf8_high_cardinality_values();
+ if sorted {
+ values.sort_unstable();
+ }
+ split_tuples(values, |v| {
+ let array: StringViewArray = v.into_iter().collect();
+ RecordBatch::try_from_iter(vec![("utf_view_high", Arc::new(array) as
_)]).unwrap()
+ })
+}
+
/// Create streams of high cardinality (~ no duplicates) utf8 values
fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_high_cardinality_values();
@@ -353,6 +390,39 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
})
}
+/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
+fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
+ let mut gen = DataGenerator::new();
+
+ // need to sort by the combined key, so combine them together
+ let mut tuples: Vec<_> = gen
+ .utf8_low_cardinality_values()
+ .into_iter()
+ .zip(gen.utf8_low_cardinality_values())
+ .zip(gen.utf8_high_cardinality_values())
+ .collect();
+
+ if sorted {
+ tuples.sort_unstable();
+ }
+
+ split_tuples(tuples, |tuples| {
+ let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
+ let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) =
tuples.into_iter().unzip();
+
+ let utf8_view_high: StringViewArray = utf8_high.into_iter().collect();
+ let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
+ let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
+
+ RecordBatch::try_from_iter(vec![
+ ("utf_view_low1", Arc::new(utf8_view_low1) as _),
+ ("utf_view_low2", Arc::new(utf8_view_low2) as _),
+ ("utf_view_high", Arc::new(utf8_view_high) as _),
+ ])
+ .unwrap()
+ })
+}
+
/// Create a batch of (f64, utf8_low, utf8_low, i64)
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
@@ -391,6 +461,44 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches
{
})
}
+/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
+fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
+ let mut gen = DataGenerator::new();
+
+ // need to sort by the combined key, so combine them together
+ let mut tuples: Vec<_> = gen
+ .i64_values()
+ .into_iter()
+ .zip(gen.utf8_low_cardinality_values())
+ .zip(gen.utf8_low_cardinality_values())
+ .zip(gen.i64_values())
+ .collect();
+
+ if sorted {
+ tuples.sort_unstable();
+ }
+
+ split_tuples(tuples, |tuples| {
+ let (tuples, i64_values): (Vec<_>, Vec<_>) =
tuples.into_iter().unzip();
+ let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
+ let (f64_values, utf8_low1): (Vec<_>, Vec<_>) =
tuples.into_iter().unzip();
+
+ let f64_values: Float64Array = f64_values.into_iter().map(|v| v as
f64).collect();
+
+ let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
+ let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
+ let i64_values: Int64Array = i64_values.into_iter().collect();
+
+ RecordBatch::try_from_iter(vec![
+ ("f64", Arc::new(f64_values) as _),
+ ("utf_view_low1", Arc::new(utf8_view_low1) as _),
+ ("utf_view_low2", Arc::new(utf8_view_low2) as _),
+ ("i64", Arc::new(i64_values) as _),
+ ])
+ .unwrap()
+ })
+}
+
/// Create a batch of (utf8_dict)
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
@@ -402,7 +510,6 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {
split_tuples(values, |v| {
let dictionary: DictionaryArray<Int32Type> =
v.iter().map(Option::as_deref).collect();
-
RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as
_)]).unwrap()
})
}
diff --git a/datafusion/physical-plan/src/sorts/cursor.rs
b/datafusion/physical-plan/src/sorts/cursor.rs
index 8ea7c43d26..3d3bd81948 100644
--- a/datafusion/physical-plan/src/sorts/cursor.rs
+++ b/datafusion/physical-plan/src/sorts/cursor.rs
@@ -18,8 +18,8 @@
use std::cmp::Ordering;
use arrow::array::{
- types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray,
OffsetSizeTrait,
- PrimitiveArray,
+ types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray,
+ GenericByteViewArray, OffsetSizeTrait, PrimitiveArray, StringViewArray,
};
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::compute::SortOptions;
@@ -281,6 +281,59 @@ impl<T: ByteArrayType> CursorArray for GenericByteArray<T>
{
}
}
+impl CursorArray for StringViewArray {
+ type Values = StringViewArray;
+ fn values(&self) -> Self {
+ self.clone()
+ }
+}
+
+impl CursorValues for StringViewArray {
+ fn len(&self) -> usize {
+ self.views().len()
+ }
+
+ fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
+ // SAFETY: Both l_idx and r_idx are guaranteed to be within bounds,
+ // and any null-checks are handled in the outer layers.
+ // Fast path: Compare the lengths before full byte comparison.
+
+ let l_view = unsafe { l.views().get_unchecked(l_idx) };
+ let l_len = *l_view as u32;
+ let r_view = unsafe { r.views().get_unchecked(r_idx) };
+ let r_len = *r_view as u32;
+ if l_len != r_len {
+ return false;
+ }
+
+ unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r,
r_idx).is_eq() }
+ }
+
+ fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
+ // SAFETY: The caller guarantees that idx > 0 and the indices are
valid.
+ // Already checked it in is_eq_to_prev_one function
+ // Fast path: Compare the lengths of the current and previous views.
+ let l_view = unsafe { cursor.views().get_unchecked(idx) };
+ let l_len = *l_view as u32;
+ let r_view = unsafe { cursor.views().get_unchecked(idx - 1) };
+ let r_len = *r_view as u32;
+ if l_len != r_len {
+ return false;
+ }
+
+ unsafe {
+ GenericByteViewArray::compare_unchecked(cursor, idx, cursor, idx -
1).is_eq()
+ }
+ }
+
+ fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
+ // SAFETY: Prior assertions guarantee that l_idx and r_idx are valid
indices.
+ // Null-checks are assumed to have been handled in the wrapper (e.g.,
ArrayValues).
+ // And the bound is checked in is_finished, it is safe to call
get_unchecked
+ unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx) }
+ }
+}
+
/// A collection of sorted, nullable [`CursorValues`]
///
/// Note: comparing cursors with different `SortOptions` will yield an
arbitrary ordering
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index a541f79dc7..3f022ec609 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -177,6 +177,7 @@ impl<'a> StreamingMergeBuilder<'a> {
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema,
metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
DataType::Utf8 => merge_helper!(StringArray, sort, streams,
schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
+ DataType::Utf8View => merge_helper!(StringViewArray, sort,
streams, schema, metrics, batch_size, fetch, reservation,
enable_round_robin_tie_breaker)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort,
streams, schema, metrics, batch_size, fetch, reservation,
enable_round_robin_tie_breaker)
DataType::Binary => merge_helper!(BinaryArray, sort, streams,
schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort,
streams, schema, metrics, batch_size, fetch, reservation,
enable_round_robin_tie_breaker)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]