This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c56e5d77f perf: Use batched row conversion for `array_has_any`, 
`array_has_all` (#20588)
3c56e5d77f is described below

commit 3c56e5d77fd7d165ba22e46a975ce0fdeb8da228
Author: Neil Conway <[email protected]>
AuthorDate: Fri Mar 13 02:28:35 2026 -0700

    perf: Use batched row conversion for `array_has_any`, `array_has_all` 
(#20588)
    
    ## Which issue does this PR close?
    
    - Closes #20587 .
    
    ## Rationale for this change
    
    `array_has_any` and `array_has_all` called
    `RowConverter::convert_columns` twice for every input row.
    `convert_columns` has a lot of per-call overhead: allocating a new
    `Rows` buffer, doing various schema checking, and so on.
    
    It is more efficient `RowConverter` on multiple rows at once. In this
    PR, we break the input batch into chunks of 512 rows. We do row
    conversion for the entire chunk in bulk, and then implement the
    `has_any` / `has_all` predicate comparison by indexing into the
    converted rows.
    
    `array_has_any` / `array_has_all` had a special-case for strings, but it
    had an analogous problem: it iterated over rows, materialized each row's
    inner list, and then called `string_array_to_vec` twice per row. That
    does a lot of per-row work; it is significantly faster to call
    `string_array_to_vec` on many rows at once, and then index into the
    results to implement the per-row comparisons.
    
    In both cases, we don't convert the entire batch in a single call
    because benchmarks suggested that it was inefficient to do whole-batch
    row conversion for large arrays (500 elements per row) on some machines.
    It seems plausible that this slowdown happens because a large batch full
    of large arrays results in a large working set, which falls outside the
    CPU cache. Splitting the batch into smaller chunks avoids this problem.
    
    ## What changes are included in this PR?
    
    * Implement optimization
    * Improve test coverage for sliced arrays; not strictly related to this
    PR but more coverage for this codepath made me feel more comfortable
    
    ## Are these changes tested?
    
    Yes.
    
    ## Are there any user-facing changes?
    
    No.
---
 datafusion/functions-nested/src/array_has.rs | 251 +++++++++++++++++++++++----
 1 file changed, 215 insertions(+), 36 deletions(-)

diff --git a/datafusion/functions-nested/src/array_has.rs 
b/datafusion/functions-nested/src/array_has.rs
index 76cf786c95..5e945cfe1d 100644
--- a/datafusion/functions-nested/src/array_has.rs
+++ b/datafusion/functions-nested/src/array_has.rs
@@ -42,6 +42,7 @@ use crate::utils::make_scalar_function;
 
 use hashbrown::HashSet;
 use std::any::Any;
+use std::ops::Range;
 use std::sync::Arc;
 
 // Create static instances of ScalarUDFs for each function
@@ -422,30 +423,66 @@ fn array_has_all_inner(args: &[ArrayRef]) -> 
Result<ArrayRef> {
     array_has_all_and_any_inner(args, ComparisonType::All)
 }
 
+/// Number of rows to process at a time when doing batched row conversion.  
This
+/// amortizes the row conversion overhead over more rows, but making this too
+/// large can cause cache pressure for large arrays. See
+/// <https://github.com/apache/datafusion/pull/20588> for context.
+const ROW_CONVERSION_CHUNK_SIZE: usize = 512;
+
 // General row comparison for array_has_all and array_has_any
 fn general_array_has_for_all_and_any<'a>(
     haystack: ArrayWrapper<'a>,
     needle: ArrayWrapper<'a>,
     comparison_type: ComparisonType,
 ) -> Result<ArrayRef> {
-    let mut boolean_builder = BooleanArray::builder(haystack.len());
+    let num_rows = haystack.len();
     let converter = 
RowConverter::new(vec![SortField::new(haystack.value_type())])?;
 
-    for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
-        if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
-            let arr_values = converter.convert_columns(&[arr])?;
-            let sub_arr_values = converter.convert_columns(&[sub_arr])?;
-            boolean_builder.append_value(general_array_has_all_and_any_kernel(
-                &arr_values,
-                &sub_arr_values,
+    let h_offsets: Vec<usize> = haystack.offsets().collect();
+    let n_offsets: Vec<usize> = needle.offsets().collect();
+
+    let h_nulls = haystack.nulls();
+    let n_nulls = needle.nulls();
+    let mut builder = BooleanArray::builder(num_rows);
+
+    for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
+        let chunk_end = (chunk_start + 
ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
+
+        // For efficiency with sliced arrays, only process the visible 
elements,
+        // not the entire underlying buffer.
+        let h_elem_start = h_offsets[chunk_start];
+        let h_elem_end = h_offsets[chunk_end];
+        let n_elem_start = n_offsets[chunk_start];
+        let n_elem_end = n_offsets[chunk_end];
+
+        let h_vals = haystack
+            .values()
+            .slice(h_elem_start, h_elem_end - h_elem_start);
+        let n_vals = needle
+            .values()
+            .slice(n_elem_start, n_elem_end - n_elem_start);
+
+        let chunk_h_rows = converter.convert_columns(&[h_vals])?;
+        let chunk_n_rows = converter.convert_columns(&[n_vals])?;
+
+        for i in chunk_start..chunk_end {
+            if h_nulls.is_some_and(|n| n.is_null(i))
+                || n_nulls.is_some_and(|n| n.is_null(i))
+            {
+                builder.append_null();
+                continue;
+            }
+            builder.append_value(general_array_has_all_and_any_kernel(
+                &chunk_h_rows,
+                (h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - 
h_elem_start),
+                &chunk_n_rows,
+                (n_offsets[i] - n_elem_start)..(n_offsets[i + 1] - 
n_elem_start),
                 comparison_type,
             ));
-        } else {
-            boolean_builder.append_null();
         }
     }
 
-    Ok(Arc::new(boolean_builder.finish()))
+    Ok(Arc::new(builder.finish()))
 }
 
 // String comparison for array_has_all and array_has_any
@@ -454,25 +491,53 @@ fn array_has_all_and_any_string_internal<'a>(
     needle: ArrayWrapper<'a>,
     comparison_type: ComparisonType,
 ) -> Result<ArrayRef> {
-    let mut boolean_builder = BooleanArray::builder(haystack.len());
-    for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
-        match (arr, sub_arr) {
-            (Some(arr), Some(sub_arr)) => {
-                let haystack_array = string_array_to_vec(&arr);
-                let needle_array = string_array_to_vec(&sub_arr);
-                boolean_builder.append_value(array_has_string_kernel(
-                    &haystack_array,
-                    &needle_array,
-                    comparison_type,
-                ));
-            }
-            (_, _) => {
-                boolean_builder.append_null();
+    let num_rows = haystack.len();
+
+    let h_offsets: Vec<usize> = haystack.offsets().collect();
+    let n_offsets: Vec<usize> = needle.offsets().collect();
+
+    let h_nulls = haystack.nulls();
+    let n_nulls = needle.nulls();
+    let mut builder = BooleanArray::builder(num_rows);
+
+    for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
+        let chunk_end = (chunk_start + 
ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
+
+        let h_elem_start = h_offsets[chunk_start];
+        let h_elem_end = h_offsets[chunk_end];
+        let n_elem_start = n_offsets[chunk_start];
+        let n_elem_end = n_offsets[chunk_end];
+
+        let h_vals = haystack
+            .values()
+            .slice(h_elem_start, h_elem_end - h_elem_start);
+        let n_vals = needle
+            .values()
+            .slice(n_elem_start, n_elem_end - n_elem_start);
+
+        let chunk_h_strings = string_array_to_vec(h_vals.as_ref());
+        let chunk_n_strings = string_array_to_vec(n_vals.as_ref());
+
+        for i in chunk_start..chunk_end {
+            if h_nulls.is_some_and(|n| n.is_null(i))
+                || n_nulls.is_some_and(|n| n.is_null(i))
+            {
+                builder.append_null();
+                continue;
             }
+            let h_start = h_offsets[i] - h_elem_start;
+            let h_end = h_offsets[i + 1] - h_elem_start;
+            let n_start = n_offsets[i] - n_elem_start;
+            let n_end = n_offsets[i + 1] - n_elem_start;
+            builder.append_value(array_has_string_kernel(
+                &chunk_h_strings[h_start..h_end],
+                &chunk_n_strings[n_start..n_end],
+                comparison_type,
+            ));
         }
     }
 
-    Ok(Arc::new(boolean_builder.finish()))
+    Ok(Arc::new(builder.finish()))
 }
 
 fn array_has_all_and_any_dispatch<'a>(
@@ -905,19 +970,22 @@ fn array_has_string_kernel(
 
 fn general_array_has_all_and_any_kernel(
     haystack_rows: &Rows,
+    h_range: Range<usize>,
     needle_rows: &Rows,
+    mut n_range: Range<usize>,
     comparison_type: ComparisonType,
 ) -> bool {
+    let h_start = h_range.start;
+    let h_end = h_range.end;
+
     match comparison_type {
-        ComparisonType::All => needle_rows.iter().all(|needle_row| {
-            haystack_rows
-                .iter()
-                .any(|haystack_row| haystack_row == needle_row)
+        ComparisonType::All => n_range.all(|ni| {
+            let needle_row = needle_rows.row(ni);
+            (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
         }),
-        ComparisonType::Any => needle_rows.iter().any(|needle_row| {
-            haystack_rows
-                .iter()
-                .any(|haystack_row| haystack_row == needle_row)
+        ComparisonType::Any => n_range.any(|ni| {
+            let needle_row = needle_rows.row(ni);
+            (h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
         }),
     }
 }
@@ -928,7 +996,10 @@ mod tests {
 
     use arrow::datatypes::Int32Type;
     use arrow::{
-        array::{Array, ArrayRef, AsArray, Int32Array, ListArray, create_array},
+        array::{
+            Array, ArrayRef, AsArray, FixedSizeListArray, Int32Array, 
ListArray,
+            create_array,
+        },
         buffer::OffsetBuffer,
         datatypes::{DataType, Field},
     };
@@ -943,7 +1014,7 @@ mod tests {
 
     use crate::expr_fn::make_array;
 
-    use super::ArrayHas;
+    use super::{ArrayHas, ArrayHasAll, ArrayHasAny};
 
     #[test]
     fn test_simplify_array_has_to_in_list() {
@@ -1153,4 +1224,112 @@ mod tests {
 
         Ok(())
     }
+
+    /// Invoke a two-argument list UDF with the given arrays and assert the
+    /// boolean output matches `expected`.
+    fn invoke_and_assert(
+        udf: &dyn ScalarUDFImpl,
+        haystack: &ArrayRef,
+        needle: ArrayRef,
+        expected: &[Option<bool>],
+    ) {
+        let num_rows = haystack.len();
+        let list_type = haystack.data_type();
+        let result = udf
+            .invoke_with_args(ScalarFunctionArgs {
+                args: vec![
+                    ColumnarValue::Array(Arc::clone(haystack)),
+                    ColumnarValue::Array(needle),
+                ],
+                arg_fields: vec![
+                    Arc::new(Field::new("haystack", list_type.clone(), false)),
+                    Arc::new(Field::new("needle", list_type.clone(), false)),
+                ],
+                number_rows: num_rows,
+                return_field: Arc::new(Field::new("return", DataType::Boolean, 
true)),
+                config_options: Arc::new(ConfigOptions::default()),
+            })
+            .unwrap();
+        let output = result.into_array(num_rows).unwrap();
+        assert_eq!(output.as_boolean().iter().collect::<Vec<_>>(), expected);
+    }
+
+    #[test]
+    fn test_sliced_list_offsets() {
+        // Full rows:
+        //   row 0: [1, 2]   (not visible after slicing)
+        //   row 1: [11, 12] (visible row 0)
+        //   row 2: [21, 22] (visible row 1)
+        //   row 3: [31, 32] (not visible after slicing)
+        let field: Arc<Field> = Arc::new(Field::new("item", DataType::Int32, 
false));
+        let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 
31, 32]));
+        let full_offsets = OffsetBuffer::new(vec![0, 2, 4, 6, 8].into());
+        let full = ListArray::new(Arc::clone(&field), full_offsets, 
full_values, None);
+        let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
+
+        // array_has_all: needle row 0 = [11], row 1 = [21]
+        let needle_all: ArrayRef = Arc::new(ListArray::new(
+            Arc::clone(&field),
+            OffsetBuffer::new(vec![0, 1, 2].into()),
+            Arc::new(Int32Array::from(vec![11, 21])),
+            None,
+        ));
+        invoke_and_assert(
+            &ArrayHasAll::new(),
+            &sliced_haystack,
+            needle_all,
+            &[Some(true), Some(true)],
+        );
+
+        // array_has_any: needle row 0 = [99, 11], row 1 = [99, 21]
+        let needle_any: ArrayRef = Arc::new(ListArray::new(
+            field,
+            OffsetBuffer::new(vec![0, 2, 4].into()),
+            Arc::new(Int32Array::from(vec![99, 11, 99, 21])),
+            None,
+        ));
+        invoke_and_assert(
+            &ArrayHasAny::new(),
+            &sliced_haystack,
+            needle_any,
+            &[Some(true), Some(true)],
+        );
+    }
+
+    #[test]
+    fn test_sliced_fixed_size_list_offsets() {
+        // Same logical data as test_sliced_list_offsets, but using 
FixedSizeListArray.
+        let field = Arc::new(Field::new("item", DataType::Int32, false));
+        let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 
31, 32]));
+        let full = FixedSizeListArray::new(Arc::clone(&field), 2, full_values, 
None);
+        let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));
+
+        // array_has_all: needle row 0 = [11, 12], row 1 = [21, 22]
+        let needle_all: ArrayRef = Arc::new(FixedSizeListArray::new(
+            Arc::clone(&field),
+            2,
+            Arc::new(Int32Array::from(vec![11, 12, 21, 22])),
+            None,
+        ));
+        invoke_and_assert(
+            &ArrayHasAll::new(),
+            &sliced_haystack,
+            needle_all,
+            &[Some(true), Some(true)],
+        );
+
+        // array_has_any: needle row 0 = [99, 12], row 1 = [99, 22]
+        let needle_any: ArrayRef = Arc::new(FixedSizeListArray::new(
+            field,
+            2,
+            Arc::new(Int32Array::from(vec![99, 12, 99, 22])),
+            None,
+        ));
+        invoke_and_assert(
+            &ArrayHasAny::new(),
+            &sliced_haystack,
+            needle_any,
+            &[Some(true), Some(true)],
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to