This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fc514c2506 perf: Optimize set operations to avoid RowConverter
deserialization overhead (#20623)
fc514c2506 is described below
commit fc514c2506b4614771a72a3c3be34d98d4eb2935
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 10 09:12:52 2026 -0700
perf: Optimize set operations to avoid RowConverter deserialization
overhead (#20623)
## Which issue does this PR close?
- Closes #20622.
## Rationale for this change
Several array set operations (e.g., `array_distinct`, `array_union`,
`array_intersect`, `array_except`) share a similar structure:
* Convert the input(s) using `RowConverter`, ideally in bulk
* Apply the set operation as appropriate, which involves adding or
removing elements from the candidate set of result `Rows`
* Convert the final set of `Rows` back into `ArrayRef`
We can do better for the final step: instead of converting from `Rows`
back into `ArrayRef`, we can just track which indices in the input(s)
correspond to the values we want to return. We can then grab those
values with a single `take`, which avoids the `Row` -> `ArrayRef`
deserialization overhead. This is a 5-20% performance win, depending on
the set operation and the characteristics of the input.
The only wrinkle is that for `intersect` and `union`, because there are
multiple inputs we need to concatenate the inputs together so that we
have a single index space. It turns out that this optimization is a win,
even incurring the `concat` overhead.
## What changes are included in this PR?
* Add a benchmark for `array_except`
* Implement this optimization for `array_distinct`, `array_union`,
`array_intersect`, `array_except`
## Are these changes tested?
Yes, and benchmarked.
## Are there any user-facing changes?
No.
---
.../functions-nested/benches/array_set_ops.rs | 21 ++++++
datafusion/functions-nested/src/except.rs | 42 ++++++++----
datafusion/functions-nested/src/set_ops.rs | 80 +++++++++++++++-------
3 files changed, 106 insertions(+), 37 deletions(-)
diff --git a/datafusion/functions-nested/benches/array_set_ops.rs
b/datafusion/functions-nested/benches/array_set_ops.rs
index e3146921d7..087e48d076 100644
--- a/datafusion/functions-nested/benches/array_set_ops.rs
+++ b/datafusion/functions-nested/benches/array_set_ops.rs
@@ -23,6 +23,7 @@ use criterion::{
};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
+use datafusion_functions_nested::except::ArrayExcept;
use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect,
ArrayUnion};
use rand::SeedableRng;
use rand::prelude::SliceRandom;
@@ -38,6 +39,7 @@ const SEED: u64 = 42;
fn criterion_benchmark(c: &mut Criterion) {
bench_array_union(c);
bench_array_intersect(c);
+ bench_array_except(c);
bench_array_distinct(c);
}
@@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) {
group.finish();
}
+fn bench_array_except(c: &mut Criterion) {
+ let mut group = c.benchmark_group("array_except");
+ let udf = ArrayExcept::new();
+
+ for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8),
("low_overlap", 0.2)] {
+ for &array_size in ARRAY_SIZES {
+ let (array1, array2) =
+ create_arrays_with_overlap(NUM_ROWS, array_size,
*overlap_ratio);
+ group.bench_with_input(
+ BenchmarkId::new(*overlap_label, array_size),
+ &array_size,
+ |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
+ );
+ }
+ }
+
+ group.finish();
+}
+
fn bench_array_distinct(c: &mut Criterion) {
let mut group = c.benchmark_group("array_distinct");
let udf = ArrayDistinct::new();
diff --git a/datafusion/functions-nested/src/except.rs
b/datafusion/functions-nested/src/except.rs
index 19a4e9573e..932eecf4b8 100644
--- a/datafusion/functions-nested/src/except.rs
+++ b/datafusion/functions-nested/src/except.rs
@@ -19,8 +19,12 @@
use crate::utils::{check_datatypes, make_scalar_function};
use arrow::array::new_null_array;
-use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait,
cast::AsArray};
+use arrow::array::{
+ Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array,
UInt64Array,
+ cast::AsArray,
+};
use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::take;
use arrow::datatypes::{DataType, FieldRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::utils::{ListCoercion, take_function_args};
@@ -179,7 +183,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));
- let mut rows = Vec::with_capacity(l_values.num_rows());
+ let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
let mut dedup = HashSet::new();
let nulls = NullBuffer::union(l.nulls(), r.nulls());
@@ -193,7 +197,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
.as_ref()
.is_some_and(|nulls| nulls.is_null(list_index))
{
- offsets.push(OffsetSize::usize_as(rows.len()));
+ offsets.push(OffsetSize::usize_as(indices.len()));
continue;
}
@@ -204,22 +208,32 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
for element_index in l_start.as_usize()..l_end.as_usize() {
let left_row = l_values.row(element_index);
if dedup.insert(left_row) {
- rows.push(left_row);
+ indices.push(element_index);
}
}
- offsets.push(OffsetSize::usize_as(rows.len()));
+ offsets.push(OffsetSize::usize_as(indices.len()));
dedup.clear();
}
- if let Some(values) = converter.convert_rows(rows)?.first() {
- Ok(GenericListArray::<OffsetSize>::new(
- field.to_owned(),
- OffsetBuffer::new(offsets.into()),
- values.to_owned(),
- nulls,
- ))
+ // Gather distinct left-side values by index.
+ // Use UInt64Array for LargeList to support values arrays exceeding
u32::MAX.
+ let values = if indices.is_empty() {
+ arrow::array::new_empty_array(&l.value_type())
+ } else if OffsetSize::IS_LARGE {
+ let indices =
+ UInt64Array::from(indices.into_iter().map(|i| i as
u64).collect::<Vec<_>>());
+ take(l.values().as_ref(), &indices, None)?
} else {
- internal_err!("array_except failed to convert rows")
- }
+ let indices =
+ UInt32Array::from(indices.into_iter().map(|i| i as
u32).collect::<Vec<_>>());
+ take(l.values().as_ref(), &indices, None)?
+ };
+
+ Ok(GenericListArray::<OffsetSize>::new(
+ field.to_owned(),
+ OffsetBuffer::new(offsets.into()),
+ values,
+ nulls,
+ ))
}
diff --git a/datafusion/functions-nested/src/set_ops.rs
b/datafusion/functions-nested/src/set_ops.rs
index 150559111f..a3d2573747 100644
--- a/datafusion/functions-nested/src/set_ops.rs
+++ b/datafusion/functions-nested/src/set_ops.rs
@@ -19,9 +19,11 @@
use crate::utils::make_scalar_function;
use arrow::array::{
- Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array,
new_null_array,
+ Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array,
UInt64Array,
+ new_empty_array, new_null_array,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::{concat, take};
use arrow::datatypes::DataType::{LargeList, List, Null};
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::row::{RowConverter, SortField};
@@ -373,12 +375,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
+ // Combine value arrays so indices from both sides share a single index
space.
+ let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?;
+ let r_offset = l.values().len();
+
match set_op {
SetOp::Union => generic_set_loop::<OffsetSize, true>(
- l, r, &rows_l, &rows_r, field, &converter,
+ l,
+ r,
+ &rows_l,
+ &rows_r,
+ field,
+ &combined_values,
+ r_offset,
),
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
- l, r, &rows_l, &rows_r, field, &converter,
+ l,
+ r,
+ &rows_l,
+ &rows_r,
+ field,
+ &combined_values,
+ r_offset,
),
}
}
@@ -391,7 +409,8 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const
IS_UNION: bool>(
rows_l: &arrow::row::Rows,
rows_r: &arrow::row::Rows,
field: Arc<Field>,
- converter: &RowConverter,
+ combined_values: &ArrayRef,
+ r_offset: usize,
) -> Result<ArrayRef> {
let l_offsets = l.value_offsets();
let r_offsets = r.value_offsets();
@@ -406,7 +425,7 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const
IS_UNION: bool>(
rows_l.num_rows().min(rows_r.num_rows())
};
- let mut final_rows = Vec::with_capacity(initial_capacity);
+ let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);
// Reuse hash sets across iterations
let mut seen = HashSet::new();
@@ -430,25 +449,27 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const
IS_UNION: bool>(
for idx in l_start..l_end {
let row = rows_l.row(idx);
if seen.insert(row) {
- final_rows.push(row);
+ indices.push(idx);
}
}
for idx in r_start..r_end {
let row = rows_r.row(idx);
if seen.insert(row) {
- final_rows.push(row);
+ indices.push(idx + r_offset);
}
}
} else {
let l_len = l_end - l_start;
let r_len = r_end - r_start;
- // Select shorter side for lookup, longer side for probing
- let (lookup_rows, lookup_range, probe_rows, probe_range) = if
l_len < r_len {
- (rows_l, l_start..l_end, rows_r, r_start..r_end)
- } else {
- (rows_r, r_start..r_end, rows_l, l_start..l_end)
- };
+ // Select shorter side for lookup, longer side for probing.
+ // Track the probe side's offset into the combined values array.
+ let (lookup_rows, lookup_range, probe_rows, probe_range,
probe_offset) =
+ if l_len < r_len {
+ (rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
+ } else {
+ (rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
+ };
lookup_set.clear();
lookup_set.reserve(lookup_range.len());
@@ -461,18 +482,25 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const
IS_UNION: bool>(
for idx in probe_range {
let row = probe_rows.row(idx);
if lookup_set.contains(&row) && seen.insert(row) {
- final_rows.push(row);
+ indices.push(idx + probe_offset);
}
}
}
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
}
- let final_values = if final_rows.is_empty() {
+ // Gather distinct values by index from the combined values array.
+ // Use UInt64Array for LargeList to support values arrays exceeding
u32::MAX.
+ let final_values = if indices.is_empty() {
new_empty_array(&l.value_type())
+ } else if OffsetSize::IS_LARGE {
+ let indices =
+ UInt64Array::from(indices.into_iter().map(|i| i as
u64).collect::<Vec<_>>());
+ take(combined_values.as_ref(), &indices, None)?
} else {
- let arrays = converter.convert_rows(final_rows)?;
- Arc::clone(&arrays[0])
+ let indices =
+ UInt32Array::from(indices.into_iter().map(|i| i as
u32).collect::<Vec<_>>());
+ take(combined_values.as_ref(), &indices, None)?
};
let arr = GenericListArray::<OffsetSize>::try_new(
@@ -539,7 +567,7 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
// Convert all values to row format in a single batch for performance
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
- let mut final_rows = Vec::with_capacity(rows.num_rows());
+ let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
let mut seen = HashSet::new();
for i in 0..array.len() {
let last_offset = *offsets.last().unwrap();
@@ -559,18 +587,24 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
for idx in start..end {
let row = rows.row(idx);
if seen.insert(row) {
- final_rows.push(row);
+ indices.push(idx);
}
}
offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
}
- // Convert all collected distinct rows back
- let final_values = if final_rows.is_empty() {
+ // Gather distinct values in a single pass, using the computed `indices`.
+ // Use UInt64Array for LargeList to support values arrays exceeding
u32::MAX.
+ let final_values = if indices.is_empty() {
new_empty_array(&dt)
+ } else if OffsetSize::IS_LARGE {
+ let indices =
+ UInt64Array::from(indices.into_iter().map(|i| i as
u64).collect::<Vec<_>>());
+ take(array.values().as_ref(), &indices, None)?
} else {
- let arrays = converter.convert_rows(final_rows)?;
- Arc::clone(&arrays[0])
+ let indices =
+ UInt32Array::from(indices.into_iter().map(|i| i as
u32).collect::<Vec<_>>());
+ take(array.values().as_ref(), &indices, None)?
};
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]