This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9c85ac608f perf: Fix quadratic behavior of `to_array_of_size` (#20459)
9c85ac608f is described below
commit 9c85ac608fced2eef0ede0c3d5defe860b3b9b2d
Author: Neil Conway <[email protected]>
AuthorDate: Tue Feb 24 08:53:10 2026 -0500
perf: Fix quadratic behavior of `to_array_of_size` (#20459)
## Which issue does this PR close?
- Closes #20458.
- Closes #18159.
## Rationale for this change
When `array_to_size(n)` was called on a `List`-like object containing a
`StringViewArray` with `b` data buffers, the previous implementation
returned a list containing a `StringViewArray` with `n*b` buffers, which
results in catastrophically bad performance if `b` grows even somewhat
large.
This issue was previously noticed causing poor nested loop join
performance. #18161 adjusted the NLJ code to avoid calling
`to_array_of_size` for this reason, but didn't attempt to fix the
underlying issue in `to_array_of_size`. This PR doesn't attempt to
revert the change to the NLJ code: the special-case code added in #18161
is still slightly faster than `to_array_of_size` after this
optimization. It might be possible to address that in a future PR.
## What changes are included in this PR?
* Instead of using `repeat_n` + `concat` to merge together `n` copies of
the `StringViewArray`, we instead use `take`, which preserves the same
number of buffers as the input `StringViewArray`.
* Add a new benchmark for this situation
* Add more unit tests for `to_array_of_size`
## Are these changes tested?
Yes and benchmarked.
## Are there any user-facing changes?
No.
## AI usage
Iterated on the problem with Claude Code; I understand the problem and
the solution.
---
datafusion/common/Cargo.toml | 4 +
datafusion/common/benches/scalar_to_array.rs | 107 +++++++++++++++++++++
datafusion/common/src/scalar/mod.rs | 96 ++++++++++++++++--
.../physical-plan/src/joins/nested_loop_join.rs | 7 +-
4 files changed, 204 insertions(+), 10 deletions(-)
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 82e7aafcee..e4ba71e45c 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -57,6 +57,10 @@ sql = ["sqlparser"]
harness = false
name = "with_hashes"
+[[bench]]
+harness = false
+name = "scalar_to_array"
+
[dependencies]
ahash = { workspace = true }
apache-avro = { workspace = true, features = [
diff --git a/datafusion/common/benches/scalar_to_array.rs
b/datafusion/common/benches/scalar_to_array.rs
new file mode 100644
index 0000000000..90a152e515
--- /dev/null
+++ b/datafusion/common/benches/scalar_to_array.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for `ScalarValue::to_array_of_size`, focusing on List
+//! scalars.
+
+use arrow::array::{Array, ArrayRef, AsArray, StringViewBuilder};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_common::utils::SingleRowListArrayBuilder;
+use std::sync::Arc;
+
+/// Build a `ScalarValue::List` of `num_elements` Utf8View strings whose
+/// inner StringViewArray has `num_buffers` data buffers.
+fn make_list_scalar(num_elements: usize, num_buffers: usize) -> ScalarValue {
+ let elements_per_buffer = num_elements.div_ceil(num_buffers);
+
+ let mut small_arrays: Vec<ArrayRef> = Vec::new();
+ let mut remaining = num_elements;
+ for buf_idx in 0..num_buffers {
+ let count = remaining.min(elements_per_buffer);
+ if count == 0 {
+ break;
+ }
+ let start = buf_idx * elements_per_buffer;
+ let mut builder = StringViewBuilder::with_capacity(count);
+ for i in start..start + count {
+ builder.append_value(format!("{i:024x}"));
+ }
+ small_arrays.push(Arc::new(builder.finish()) as ArrayRef);
+ remaining -= count;
+ }
+
+ let refs: Vec<&dyn Array> = small_arrays.iter().map(|a|
a.as_ref()).collect();
+ let concated = arrow::compute::concat(&refs).unwrap();
+
+ let list_array = SingleRowListArrayBuilder::new(concated)
+ .with_field(&Field::new_list_field(DataType::Utf8View, true))
+ .build_list_array();
+ ScalarValue::List(Arc::new(list_array))
+}
+
+/// We want to measure the cost of doing the conversion and then also accessing
+/// the results, to model what would happen during query evaluation.
+fn consume_list_array(arr: &ArrayRef) {
+ let list_arr = arr.as_list::<i32>();
+ let mut total_len: usize = 0;
+ for i in 0..list_arr.len() {
+ let inner = list_arr.value(i);
+ let sv = inner.as_string_view();
+ for j in 0..sv.len() {
+ total_len += sv.value(j).len();
+ }
+ }
+ std::hint::black_box(total_len);
+}
+
+fn bench_list_to_array_of_size(c: &mut Criterion) {
+ let mut group = c.benchmark_group("list_to_array_of_size");
+
+ let num_elements = 1245;
+ let scalar_1buf = make_list_scalar(num_elements, 1);
+ let scalar_50buf = make_list_scalar(num_elements, 50);
+
+ for batch_size in [256, 1024] {
+ group.bench_with_input(
+ BenchmarkId::new("1_buffer", batch_size),
+ &batch_size,
+ |b, &sz| {
+ b.iter(|| {
+ let arr = scalar_1buf.to_array_of_size(sz).unwrap();
+ consume_list_array(&arr);
+ });
+ },
+ );
+ group.bench_with_input(
+ BenchmarkId::new("50_buffers", batch_size),
+ &batch_size,
+ |b, &sz| {
+ b.iter(|| {
+ let arr = scalar_50buf.to_array_of_size(sz).unwrap();
+ consume_list_array(&arr);
+ });
+ },
+ );
+ }
+
+ group.finish();
+}
+
+criterion_group!(benches, bench_list_to_array_of_size);
+criterion_main!(benches);
diff --git a/datafusion/common/src/scalar/mod.rs
b/datafusion/common/src/scalar/mod.rs
index f24df860c4..c21d3e21f0 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -3008,7 +3008,7 @@ impl ScalarValue {
///
/// Errors if `self` is
/// - a decimal that fails be converted to a decimal array of size
- /// - a `FixedsizeList` that fails to be concatenated into an array of size
+ /// - a `FixedSizeList` that fails to be concatenated into an array of size
/// - a `List` that fails to be concatenated into an array of size
/// - a `Dictionary` that fails be converted to a dictionary array of size
pub fn to_array_of_size(&self, size: usize) -> Result<ArrayRef> {
@@ -3434,13 +3434,22 @@ impl ScalarValue {
}
}
+ /// Repeats the rows of `arr` `size` times, producing an array with
+ /// `arr.len() * size` total rows.
fn list_to_array_of_size(arr: &dyn Array, size: usize) -> Result<ArrayRef>
{
- let arrays = repeat_n(arr, size).collect::<Vec<_>>();
- let ret = match !arrays.is_empty() {
- true => arrow::compute::concat(arrays.as_slice())?,
- false => arr.slice(0, 0),
- };
- Ok(ret)
+ if size == 0 {
+ return Ok(arr.slice(0, 0));
+ }
+
+ // Examples: given `arr = [[A, B, C]]` and `size = 3`, `indices = [0,
0, 0]` and
+ // the result is `[[A, B, C], [A, B, C], [A, B, C]]`.
+ //
+ // Given `arr = [[A, B], [C]]` and `size = 2`, `indices = [0, 1, 0,
1]` and the
+ // result is `[[A, B], [C], [A, B], [C]]`. (But in practice, we are
always called
+ // with `arr.len() == 1`.)
+ let n = arr.len() as u32;
+ let indices = UInt32Array::from_iter_values((0..size).flat_map(|_|
0..n));
+ Ok(arrow::compute::take(arr, &indices, None)?)
}
/// Retrieve ScalarValue for each row in `array`
@@ -5532,6 +5541,79 @@ mod tests {
assert_eq!(empty_array.len(), 0);
}
+ #[test]
+ fn test_to_array_of_size_list_size_one() {
+ // size=1 takes the fast path (Arc::clone)
+ let arr = ListArray::from_iter_primitive::<Int32Type, _,
_>(vec![Some(vec![
+ Some(10),
+ Some(20),
+ ])]);
+ let sv = ScalarValue::List(Arc::new(arr.clone()));
+ let result = sv.to_array_of_size(1).unwrap();
+ assert_eq!(result.as_list::<i32>(), &arr);
+ }
+
+ #[test]
+ fn test_to_array_of_size_list_empty_inner() {
+ // A list scalar containing an empty list: [[]]
+ let arr = ListArray::from_iter_primitive::<Int32Type, _,
_>(vec![Some(vec![])]);
+ let sv = ScalarValue::List(Arc::new(arr));
+ let result = sv.to_array_of_size(3).unwrap();
+ let result_list = result.as_list::<i32>();
+ assert_eq!(result_list.len(), 3);
+ for i in 0..3 {
+ assert_eq!(result_list.value(i).len(), 0);
+ }
+ }
+
+ #[test]
+ fn test_to_array_of_size_large_list() {
+ let arr =
+ LargeListArray::from_iter_primitive::<Int32Type, _,
_>(vec![Some(vec![
+ Some(100),
+ Some(200),
+ ])]);
+ let sv = ScalarValue::LargeList(Arc::new(arr));
+ let result = sv.to_array_of_size(3).unwrap();
+ let expected = LargeListArray::from_iter_primitive::<Int32Type, _,
_>(vec![
+ Some(vec![Some(100), Some(200)]),
+ Some(vec![Some(100), Some(200)]),
+ Some(vec![Some(100), Some(200)]),
+ ]);
+ assert_eq!(result.as_list::<i64>(), &expected);
+ }
+
+ #[test]
+ fn test_list_to_array_of_size_multi_row() {
+ // Call list_to_array_of_size directly with arr.len() > 1
+ let arr = Int32Array::from(vec![Some(10), None, Some(30)]);
+ let result = ScalarValue::list_to_array_of_size(&arr, 3).unwrap();
+ let result = result.as_primitive::<Int32Type>();
+ assert_eq!(
+ result.iter().collect::<Vec<_>>(),
+ vec![
+ Some(10),
+ None,
+ Some(30),
+ Some(10),
+ None,
+ Some(30),
+ Some(10),
+ None,
+ Some(30),
+ ]
+ );
+ }
+
+ #[test]
+ fn test_to_array_of_size_null_list() {
+ let dt =
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
+ let sv = ScalarValue::try_from(&dt).unwrap();
+ let result = sv.to_array_of_size(3).unwrap();
+ assert_eq!(result.len(), 3);
+ assert_eq!(result.null_count(), 3);
+ }
+
/// See https://github.com/apache/datafusion/issues/18870
#[test]
fn test_to_array_of_size_for_none_fsb() {
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 5b2cebb360..33fec9e181 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -2011,9 +2011,10 @@ fn build_row_join_batch(
// Broadcast the single build-side row to match the filtered
// probe-side batch length
let original_left_array =
build_side_batch.column(column_index.index);
- // Avoid using `ScalarValue::to_array_of_size()` for
`List(Utf8View)` to avoid
- // deep copies for buffers inside `Utf8View` array. See below for
details.
- // https://github.com/apache/datafusion/issues/18159
+
+ // Use `arrow::compute::take` directly for `List(Utf8View)` rather
+ // than going through `ScalarValue::to_array_of_size()`, which
+ // avoids some intermediate allocations.
//
// In other cases, `to_array_of_size()` is faster.
match original_left_array.data_type() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]