This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d2df7a57c4 perf: Optimize `array_concat` using `MutableArrayData`
(#20620)
d2df7a57c4 is described below
commit d2df7a57c417e1e0534a60e4c283513604008763
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 3 12:03:24 2026 -0500
perf: Optimize `array_concat` using `MutableArrayData` (#20620)
## Which issue does this PR close?
- Closes #20619 .
## Rationale for this change
The current implementation of `array_concat` creates an `ArrayRef` for
each row, uses Arrow's `concat` kernel to merge the elements together,
and then uses `concat` again to produce the final results. This does a
lot of unnecessary allocation and copying.
Instead, we can use `MutableArrayData::extend` to copy element ranges in
bulk, which avoids much of this intermediate copying and allocation.
This approach is 5-15x faster on a microbenchmark.
## What changes are included in this PR?
* Add benchmark
* Improve SLT test coverage for `array_concat`
* Implement optimization
## Are these changes tested?
Yes, and benchmarked.
## Are there any user-facing changes?
No.
---
datafusion/functions-nested/Cargo.toml | 4 +
.../functions-nested/benches/array_concat.rs | 94 +++++++++++++++++++
datafusion/functions-nested/src/concat.rs | 101 +++++++++++----------
datafusion/sqllogictest/test_files/array.slt | 16 ++++
4 files changed, 168 insertions(+), 47 deletions(-)
diff --git a/datafusion/functions-nested/Cargo.toml
b/datafusion/functions-nested/Cargo.toml
index 0b26170dbb..0fdb69e6e7 100644
--- a/datafusion/functions-nested/Cargo.toml
+++ b/datafusion/functions-nested/Cargo.toml
@@ -67,6 +67,10 @@ paste = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
rand = { workspace = true }
+[[bench]]
+harness = false
+name = "array_concat"
+
[[bench]]
harness = false
name = "array_expression"
diff --git a/datafusion/functions-nested/benches/array_concat.rs
b/datafusion/functions-nested/benches/array_concat.rs
new file mode 100644
index 0000000000..75dcc88f14
--- /dev/null
+++ b/datafusion/functions-nested/benches/array_concat.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, ListArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+use datafusion_functions_nested::concat::array_concat_inner;
+
+const SEED: u64 = 42;
+
+/// Build a `ListArray<i32>` with `num_lists` rows, each containing
+/// `elements_per_list` random i32 values. Every 10th row is null.
+fn make_list_array(
+ rng: &mut StdRng,
+ num_lists: usize,
+ elements_per_list: usize,
+) -> ArrayRef {
+ let total_values = num_lists * elements_per_list;
+ let values: Vec<i32> = (0..total_values).map(|_| rng.random()).collect();
+ let values = Arc::new(Int32Array::from(values));
+
+ let offsets: Vec<i32> = (0..=num_lists)
+ .map(|i| (i * elements_per_list) as i32)
+ .collect();
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+
+ let nulls: Vec<bool> = (0..num_lists).map(|i| i % 10 != 0).collect();
+ let nulls = Some(NullBuffer::from(nulls));
+
+ Arc::new(ListArray::new(
+ Arc::new(Field::new("item", DataType::Int32, false)),
+ offsets,
+ values,
+ nulls,
+ ))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let mut group = c.benchmark_group("array_concat");
+
+ // Benchmark: varying number of rows, 20 elements per list
+ for num_rows in [100, 1000, 10000] {
+ let mut rng = StdRng::seed_from_u64(SEED);
+ let list_a = make_list_array(&mut rng, num_rows, 20);
+ let list_b = make_list_array(&mut rng, num_rows, 20);
+ let args: Vec<ArrayRef> = vec![list_a, list_b];
+
+ group.bench_with_input(BenchmarkId::new("rows", num_rows), &args, |b,
args| {
+ b.iter(|| black_box(array_concat_inner(args).unwrap()));
+ });
+ }
+
+ // Benchmark: 1000 rows, varying element counts per list
+ for elements_per_list in [5, 50, 500] {
+ let mut rng = StdRng::seed_from_u64(SEED);
+ let list_a = make_list_array(&mut rng, 1000, elements_per_list);
+ let list_b = make_list_array(&mut rng, 1000, elements_per_list);
+ let args: Vec<ArrayRef> = vec![list_a, list_b];
+
+ group.bench_with_input(
+ BenchmarkId::new("elements_per_list", elements_per_list),
+ &args,
+ |b, args| {
+ b.iter(|| black_box(array_concat_inner(args).unwrap()));
+ },
+ );
+ }
+
+ group.finish();
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions-nested/src/concat.rs
b/datafusion/functions-nested/src/concat.rs
index 0a7402060a..78519d2de2 100644
--- a/datafusion/functions-nested/src/concat.rs
+++ b/datafusion/functions-nested/src/concat.rs
@@ -24,9 +24,9 @@ use crate::make_array::make_array_inner;
use crate::utils::{align_array_dimensions, check_datatypes,
make_scalar_function};
use arrow::array::{
Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
- NullBufferBuilder, OffsetSizeTrait,
+ OffsetSizeTrait,
};
-use arrow::buffer::OffsetBuffer;
+use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use datafusion_common::utils::{
@@ -352,7 +352,7 @@ impl ScalarUDFImpl for ArrayConcat {
}
}
-fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("array_concat expects at least one argument");
}
@@ -396,58 +396,65 @@ fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef])
-> Result<ArrayRef> {
.iter()
.map(|arg| as_generic_list_array::<O>(arg))
.collect::<Result<Vec<_>>>()?;
- // Assume number of rows is the same for all arrays
let row_count = list_arrays[0].len();
- let mut array_lengths = vec![];
- let mut arrays = vec![];
- let mut valid = NullBufferBuilder::new(row_count);
- for i in 0..row_count {
- let nulls = list_arrays
+ // Extract underlying values ArrayData from each list array for
MutableArrayData.
+ let values_data: Vec<ArrayData> =
+ list_arrays.iter().map(|la| la.values().to_data()).collect();
+ let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();
+
+ // Estimate capacity as the sum of all values arrays' lengths.
+ let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();
+
+ let mut mutable = MutableArrayData::with_capacities(
+ values_data_refs,
+ false,
+ Capacities::Array(total_capacity),
+ );
+ let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
+ offsets.push(O::zero());
+
+ // Compute the output null buffer: a row is null only if null in ALL input
+ // arrays. This is the bitwise OR of validity bits (valid if valid in ANY
+ // input). If any array has no null buffer (all valid), no output row can
be
+ // null.
+ let nulls = list_arrays
+ .iter()
+ .filter_map(|la| la.nulls())
+ .collect::<Vec<_>>();
+ let valid = if nulls.len() == list_arrays.len() {
+ nulls
.iter()
- .map(|arr| arr.is_null(i))
- .collect::<Vec<_>>();
-
- // If all the arrays are null, the concatenated array is null
- let is_null = nulls.iter().all(|&x| x);
- if is_null {
- array_lengths.push(0);
- valid.append_null();
- } else {
- // Get all the arrays on i-th row
- let values = list_arrays
- .iter()
- .map(|arr| arr.value(i))
- .collect::<Vec<_>>();
-
- let elements = values
- .iter()
- .map(|a| a.as_ref())
- .collect::<Vec<&dyn Array>>();
-
- // Concatenated array on i-th row
- let concatenated_array =
arrow::compute::concat(elements.as_slice())?;
- array_lengths.push(concatenated_array.len());
- arrays.push(concatenated_array);
- valid.append_non_null();
+ .map(|n| n.inner().clone())
+ .reduce(|a, b| &a | &b)
+ .map(NullBuffer::new)
+ } else {
+ None
+ };
+
+ for row_idx in 0..row_count {
+ for (arr_idx, list_array) in list_arrays.iter().enumerate() {
+ if list_array.is_null(row_idx) {
+ continue;
+ }
+ let start = list_array.offsets()[row_idx].to_usize().unwrap();
+ let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
+ if start < end {
+ mutable.extend(arr_idx, start, end);
+ }
}
+ offsets.push(O::usize_as(mutable.len()));
}
- // Assume all arrays have the same data type
- let data_type = list_arrays[0].value_type();
- let elements = arrays
- .iter()
- .map(|a| a.as_ref())
- .collect::<Vec<&dyn Array>>();
+ let data_type = list_arrays[0].value_type();
+ let data = mutable.freeze();
- let list_arr = GenericListArray::<O>::new(
+ Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
- OffsetBuffer::from_lengths(array_lengths),
- Arc::new(arrow::compute::concat(elements.as_slice())?),
- valid.finish(),
- );
-
- Ok(Arc::new(list_arr))
+ OffsetBuffer::new(offsets.into()),
+ arrow::array::make_array(data),
+ valid,
+ )?))
}
// Kernel functions
diff --git a/datafusion/sqllogictest/test_files/array.slt
b/datafusion/sqllogictest/test_files/array.slt
index 17475c6a11..00d28d38d6 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -3453,6 +3453,22 @@ select
----
[1, 2, 3] List(Utf8View)
+# array_concat with NULL elements inside arrays
+query ?
+select array_concat([1, NULL, 3], [NULL, 5]);
+----
+[1, NULL, 3, NULL, 5]
+
+query ?
+select array_concat([NULL, NULL], [1, 2], [NULL]);
+----
+[NULL, NULL, 1, 2, NULL]
+
+query ?
+select array_concat([NULL, NULL], [NULL, NULL]);
+----
+[NULL, NULL, NULL, NULL]
+
# array_concat error
query error DataFusion error: Error during planning: Execution error: Function
'array_concat' user-defined coercion failed with "Error during planning:
array_concat does not support type Int64"
select array_concat(1, 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]