This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d91dcb7737 feat(functions-nested): add array_filter higher-order
function (#21895)
d91dcb7737 is described below
commit d91dcb7737f03b3c92fcc29f275a3ae8b71a2c7d
Author: Lavkesh Lahngir <[email protected]>
AuthorDate: Wed May 20 17:16:21 2026 +0200
feat(functions-nested): add array_filter higher-order function (#21895)
## Which issue does this PR close?
Partially addresses #14509 — implements `array_filter` / `list_filter`.
## Rationale for this change
`array_transform` (#21679) added the first `HigherOrderUDF`.
`array_filter` is the natural companion: filter array elements with a
boolean lambda, matching Spark `filter` / DuckDB `list_filter`
semantics.
## What changes are included in this PR?
- New `HigherOrderUDF` `ArrayFilter` (`array_filter` / `list_filter`
alias)
- Boolean lambda per element; `true` keeps, `false`/null drops (matches
Spark semantics)
- Handles `List`, `LargeList`, sliced arrays, null sublists
- Scalar predicate short-circuit (`x -> true` / `x -> false`)
- No-copy fast path when nothing is filtered (skips
`arrow::compute::filter`)
- Shared HOF helpers extracted from `array_transform` into a common
module (`value_lambda_pair`, `coerce_single_list_arg`,
`single_list_lambda_parameters`, `extract_list_values`)
- Shared unit test helpers for higher-order function tests
## Are these changes tested?
- Unit tests: basic filter, multiple sublists, sliced arrays, null
sublists, all-filtered-out, nothing-filtered (fast path), scalar
true/false predicates
- SQL logic tests in `array_filter.slt`: filter variants, `array_filter`
+ `array_transform` combinations, error cases
## Are there any user-facing changes?
Yes — `array_filter(array, lambda)` and alias `list_filter(array,
lambda)` are now available as SQL functions.
---
datafusion/functions-nested/src/array_filter.rs | 464 +++++++++++++++++++++
datafusion/functions-nested/src/array_transform.rs | 145 ++-----
datafusion/functions-nested/src/lambda_utils.rs | 191 +++++++++
datafusion/functions-nested/src/lib.rs | 5 +
.../sqllogictest/test_files/array/array_filter.slt | 217 ++++++++++
docs/source/user-guide/sql/scalar_functions.md | 34 ++
6 files changed, 937 insertions(+), 119 deletions(-)
diff --git a/datafusion/functions-nested/src/array_filter.rs
b/datafusion/functions-nested/src/array_filter.rs
new file mode 100644
index 0000000000..f8b7fc3540
--- /dev/null
+++ b/datafusion/functions-nested/src/array_filter.rs
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`HigherOrderUDF`] definitions for array_filter function.
+
+use arrow::{
+ array::{
+ Array, ArrayRef, AsArray, BooleanArray, LargeListArray, ListArray,
+ OffsetBufferBuilder, OffsetSizeTrait, new_empty_array,
+ },
+ buffer::{OffsetBuffer, ScalarBuffer},
+ compute::{filter as arrow_filter, take_arrays},
+ datatypes::{DataType, Field, FieldRef},
+};
+use datafusion_common::{
+ Result, ScalarValue, exec_err,
+ utils::{adjust_offsets_for_slice, list_values_row_number},
+};
+use datafusion_expr::{
+ ColumnarValue, Documentation, HigherOrderFunctionArgs,
HigherOrderReturnFieldArgs,
+ HigherOrderSignature, HigherOrderUDF, LambdaParametersProgress,
ValueOrLambda,
+ Volatility,
+};
+use datafusion_macros::user_doc;
+use std::sync::Arc;
+
+use crate::lambda_utils::{
+ ListValuesResult, coerce_single_list_arg, extract_list_values,
+ single_list_lambda_parameters, value_lambda_pair,
+};
+
+make_higher_order_function_expr_and_func!(
+ ArrayFilter,
+ array_filter,
+ array lambda,
+ "filters the values of an array using a boolean lambda",
+ array_filter_higher_order_function
+);
+
+#[user_doc(
+ doc_section(label = "Array Functions"),
+ description = "filters the values of an array using a boolean lambda",
+ syntax_example = "array_filter(array, x -> x > 2)",
+ sql_example = r#"```sql
+> select array_filter([1, 2, 3, 4, 5], x -> x > 2);
++--------------------------------------------+
+| array_filter([1, 2, 3, 4, 5], x -> x > 2) |
++--------------------------------------------+
+| [3, 4, 5] |
++--------------------------------------------+
+```"#,
+ argument(
+ name = "array",
+ description = "Array expression. Can be a constant, column, or
function, and any combination of array operators."
+ ),
+ argument(
+ name = "lambda",
+ description = "Lambda that returns a boolean. Elements for which the
lambda returns true are kept."
+ )
+)]
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct ArrayFilter {
+ signature: HigherOrderSignature,
+ aliases: Vec<String>,
+}
+
+impl Default for ArrayFilter {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ArrayFilter {
+ pub fn new() -> Self {
+ Self {
+ signature: HigherOrderSignature::exact(
+ vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())],
+ Volatility::Immutable,
+ ),
+ aliases: vec![String::from("list_filter")],
+ }
+ }
+}
+
+impl HigherOrderUDF for ArrayFilter {
+ fn name(&self) -> &str {
+ "array_filter"
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+
+ fn signature(&self) -> &HigherOrderSignature {
+ &self.signature
+ }
+
+ fn lambda_parameters(
+ &self,
+ _step: usize,
+ fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+ ) -> Result<LambdaParametersProgress> {
+ single_list_lambda_parameters(self.name(), fields)
+ }
+
+ fn return_field_from_args(
+ &self,
+ args: HigherOrderReturnFieldArgs,
+ ) -> Result<Arc<Field>> {
+ let (list, _lambda) = value_lambda_pair(self.name(), args.arg_fields)?;
+ Ok(Arc::new(Field::new(
+ "",
+ list.data_type().clone(),
+ list.is_nullable(),
+ )))
+ }
+
+ fn invoke_with_args(&self, args: HigherOrderFunctionArgs) ->
Result<ColumnarValue> {
+ let (list, lambda) = value_lambda_pair(self.name(), &args.args)?;
+ let list_array = list.to_array(args.number_rows)?;
+
+ let list_values = match extract_list_values(&list_array,
args.return_type())? {
+ ListValuesResult::EarlyReturn(v) => return Ok(v),
+ ListValuesResult::Values(v) => v,
+ };
+
+ let field = match args.return_field.data_type() {
+ DataType::List(field) | DataType::LargeList(field) =>
Arc::clone(field),
+ _ => {
+ return exec_err!(
+ "{} expected return_field to be a list, got {}",
+ self.name(),
+ args.return_field
+ );
+ }
+ };
+
+ let values_param = || Ok(Arc::clone(&list_values));
+ let predicate_output = lambda.evaluate(&[&values_param], |arrays| {
+ let indices = list_values_row_number(&list_array)?;
+ Ok(take_arrays(arrays, &indices, None)?)
+ })?;
+
+ // Scalar predicate short-circuit: x -> true or x -> false/null
+ if let ColumnarValue::Scalar(ScalarValue::Boolean(b)) =
&predicate_output {
+ return match b {
+ Some(true) => Ok(ColumnarValue::Array(list_array)),
+ _ => Ok(ColumnarValue::Array(empty_filtered_list(
+ &list_array,
+ field,
+ )?)),
+ };
+ }
+
+ let predicate = predicate_output.into_array(list_values.len())?;
+ let Some(predicate) =
predicate.as_any().downcast_ref::<BooleanArray>() else {
+ return exec_err!(
+ "{} lambda must return boolean, got {}",
+ self.name(),
+ predicate.data_type()
+ );
+ };
+
+ // ListView and LargeListView are coerced to List/LargeList by
coerce_value_types.
+ let filtered_list = match list_array.data_type() {
+ DataType::List(_) => {
+ let list = list_array.as_list::<i32>();
+ let adjusted_offsets = adjust_offsets_for_slice(list);
+ let (filtered_values, new_offsets) =
+ filter_list_values(&list_values, predicate,
&adjusted_offsets)?;
+ Arc::new(ListArray::new(
+ field,
+ new_offsets,
+ filtered_values,
+ list.nulls().cloned(),
+ )) as ArrayRef
+ }
+ DataType::LargeList(_) => {
+ let large_list = list_array.as_list::<i64>();
+ let adjusted_offsets = adjust_offsets_for_slice(large_list);
+ let (filtered_values, new_offsets) =
+ filter_list_values(&list_values, predicate,
&adjusted_offsets)?;
+ Arc::new(LargeListArray::new(
+ field,
+ new_offsets,
+ filtered_values,
+ large_list.nulls().cloned(),
+ ))
+ }
+ other => exec_err!("expected list, got {other}")?,
+ };
+
+ Ok(ColumnarValue::Array(filtered_list))
+ }
+
+ fn coerce_value_types(&self, arg_types: &[DataType]) ->
Result<Vec<DataType>> {
+ coerce_single_list_arg(self.name(), arg_types)
+ }
+
+ fn documentation(&self) -> Option<&Documentation> {
+ self.doc()
+ }
+}
+
+/// Returns a list array with every non-null sublist emptied, preserving the
null buffer.
+/// Used for the `x -> false` / `x -> null` scalar predicate short-circuit.
+fn empty_filtered_list(list_array: &ArrayRef, field: FieldRef) ->
Result<ArrayRef> {
+ let n = list_array.len();
+ let empty_values = new_empty_array(field.data_type());
+ Ok(match list_array.data_type() {
+ DataType::List(_) => {
+ let list = list_array.as_list::<i32>();
+ Arc::new(ListArray::new(
+ field,
+ OffsetBuffer::new(ScalarBuffer::from(vec![0i32; n + 1])),
+ empty_values,
+ list.nulls().cloned(),
+ ))
+ }
+ DataType::LargeList(_) => {
+ let list = list_array.as_list::<i64>();
+ Arc::new(LargeListArray::new(
+ field,
+ OffsetBuffer::new(ScalarBuffer::from(vec![0i64; n + 1])),
+ empty_values,
+ list.nulls().cloned(),
+ ))
+ }
+ other => return exec_err!("expected list, got {other}"),
+ })
+}
+
+/// Filters flat list values using a boolean predicate, returning filtered
values and
+/// recomputed per-sublist offsets. Null predicate values are treated as false.
+fn filter_list_values<O: OffsetSizeTrait>(
+ values: &ArrayRef,
+ predicate: &BooleanArray,
+ offsets: &OffsetBuffer<O>,
+) -> Result<(ArrayRef, OffsetBuffer<O>)> {
+ let num_sublists = offsets.len().saturating_sub(1);
+ let mut builder = OffsetBufferBuilder::<O>::new(num_sublists);
+
+ let has_nulls = predicate.null_count() > 0;
+ for i in 0..num_sublists {
+ let start = offsets[i].as_usize();
+ let end = offsets[i + 1].as_usize();
+ let count = if has_nulls {
+ (start..end)
+ .filter(|&j| predicate.is_valid(j) && predicate.value(j))
+ .count()
+ } else {
+ predicate
+ .values()
+ .slice(start, end - start)
+ .count_set_bits()
+ };
+ builder.push_length(count);
+ }
+
+ let new_offsets = builder.finish();
+
+ if new_offsets.last() == offsets.last() {
+ return Ok((Arc::clone(values), offsets.clone()));
+ }
+
+ // arrow_filter treats null predicate values as false
+ let filtered_values = arrow_filter(values.as_ref(), predicate)?;
+ Ok((filtered_values, new_offsets))
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::{
+ array::{Array, AsArray},
+ buffer::{NullBuffer, OffsetBuffer},
+ };
+
+ use crate::array_filter::array_filter_higher_order_function;
+ use crate::lambda_utils::test_utils::{create_i32_list,
eval_hof_on_i32_list, v};
+ use datafusion_expr::lit;
+
+ fn keep_greater_than_two(
+ list: impl Array + Clone + 'static,
+ ) -> datafusion_common::Result<arrow::array::ArrayRef> {
+ eval_hof_on_i32_list(
+ array_filter_higher_order_function(),
+ list,
+ v().gt(lit(2i32)),
+ )
+ }
+
+ #[test]
+ fn filter_basic() {
+ let list = create_i32_list(
+ vec![1, 2, 3, 4, 5],
+ OffsetBuffer::<i32>::from_lengths(vec![5]),
+ None,
+ );
+
+ let res = keep_greater_than_two(list).unwrap();
+ let actual = res.as_list::<i32>();
+
+ let expected = create_i32_list(
+ vec![3, 4, 5],
+ OffsetBuffer::<i32>::from_lengths(vec![3]),
+ None,
+ );
+
+ assert_eq!(actual, &expected);
+ }
+
+ #[test]
+ fn filter_multiple_sublists() {
+ let list = create_i32_list(
+ vec![1, 5, 2, 4, 3],
+ OffsetBuffer::<i32>::from_lengths(vec![2, 3]),
+ None,
+ );
+
+ let res = keep_greater_than_two(list).unwrap();
+ let actual = res.as_list::<i32>();
+
+ // [1,5] -> [5], [2,4,3] -> [4,3]
+ let expected = create_i32_list(
+ vec![5, 4, 3],
+ OffsetBuffer::<i32>::from_lengths(vec![1, 2]),
+ None,
+ );
+
+ assert_eq!(actual, &expected);
+ }
+
+ #[test]
+ fn filter_on_sliced_list_should_not_evaluate_on_unreachable_values() {
+ // First sublist [0] is sliced away; sliced array covers sublists
[1..3]
+ let list = create_i32_list(
+ vec![
+ 0, // unreachable after slice — if evaluated, it would appear
in output
+ 1, 5, 2, 4, 3, 7,
+ ],
+ OffsetBuffer::<i32>::from_lengths(vec![1, 3, 3]),
+ None,
+ )
+ .slice(1, 2);
+
+ let res = keep_greater_than_two(list).unwrap();
+ let actual = res.as_list::<i32>();
+
+ // [1,5,2] -> [5], [4,3,7] -> [4,3,7]
+ let expected = create_i32_list(
+ vec![5, 4, 3, 7],
+ OffsetBuffer::<i32>::from_lengths(vec![1, 3]),
+ None,
+ );
+
+ assert_eq!(actual, &expected);
+ }
+
+ #[test]
+ fn filter_should_not_be_evaluated_on_values_underlying_null() {
+ // The null sublist (index 1) contains values that would pass the
predicate
+ // if evaluated. We verify they do NOT appear in the output.
+ let list = create_i32_list(
+ vec![1, 5, 99, 100, 3, 7],
+ OffsetBuffer::<i32>::from_lengths(vec![2, 2, 2]),
+ Some(NullBuffer::from(vec![true, false, true])),
+ );
+
+ let res = keep_greater_than_two(list).unwrap();
+ let actual = res.as_list::<i32>();
+
+ // sublist 0: [1,5] -> [5]
+ // sublist 1: null -> null (empty range, null bit)
+ // sublist 2: [3,7] -> [3,7]
+ let expected = create_i32_list(
+ vec![5, 3, 7],
+ OffsetBuffer::<i32>::from_lengths(vec![1, 0, 2]),
+ Some(NullBuffer::from(vec![true, false, true])),
+ );
+
+ assert_eq!(actual.data_type(), expected.data_type());
+ assert_eq!(actual, &expected);
+ }
+
+ #[test]
+ fn filter_all_filtered_out() {
+ let list =
+ create_i32_list(vec![1, 2],
OffsetBuffer::<i32>::from_lengths(vec![2]), None);
+
+ let res = keep_greater_than_two(list).unwrap();
+ let actual = res.as_list::<i32>();
+
+ let expected = create_i32_list(
+ vec![0i32; 0],
+ OffsetBuffer::<i32>::from_lengths(vec![0]),
+ None,
+ );
+
+ assert_eq!(actual, &expected);
+ }
+
+ #[test]
+ fn filter_nothing_filtered_reuses_values() {
+ let list = create_i32_list(
+ vec![3, 4, 5],
+ OffsetBuffer::<i32>::from_lengths(vec![3]),
+ None,
+ );
+ // all elements > 2, so nothing is filtered — values buffer should be
reused
+ let res = keep_greater_than_two(list.clone()).unwrap();
+ assert_eq!(res.as_list::<i32>(), &list);
+ }
+
+ #[test]
+ fn scalar_true_predicate_returns_original_list() {
+ let list = create_i32_list(
+ vec![1, 2, 3],
+ OffsetBuffer::<i32>::from_lengths(vec![3]),
+ None,
+ );
+ // x -> true: every element kept, should return list unchanged
+ let res = eval_hof_on_i32_list(
+ array_filter_higher_order_function(),
+ list.clone(),
+ lit(true),
+ )
+ .unwrap();
+ assert_eq!(res.as_list::<i32>(), &list);
+ }
+
+ #[test]
+ fn scalar_false_predicate_returns_empty_sublists() {
+ let list = create_i32_list(
+ vec![1, 2, 3, 4],
+ OffsetBuffer::<i32>::from_lengths(vec![2, 2]),
+ None,
+ );
+ // x -> false: every sublist emptied
+ let res =
+ eval_hof_on_i32_list(array_filter_higher_order_function(), list,
lit(false))
+ .unwrap();
+ let actual = res.as_list::<i32>();
+ let expected = create_i32_list(
+ vec![0i32; 0],
+ OffsetBuffer::<i32>::from_lengths(vec![0, 0]),
+ None,
+ );
+ assert_eq!(actual, &expected);
+ }
+}
diff --git a/datafusion/functions-nested/src/array_transform.rs
b/datafusion/functions-nested/src/array_transform.rs
index bfec3613b6..a0415749f4 100644
--- a/datafusion/functions-nested/src/array_transform.rs
+++ b/datafusion/functions-nested/src/array_transform.rs
@@ -23,10 +23,8 @@ use arrow::{
datatypes::{DataType, Field, FieldRef},
};
use datafusion_common::{
- Result, ScalarValue, exec_err, plan_err,
- utils::{
- adjust_offsets_for_slice, list_values, list_values_row_number,
take_function_args,
- },
+ Result, exec_err, plan_err,
+ utils::{adjust_offsets_for_slice, list_values_row_number,
take_function_args},
};
use datafusion_expr::{
ColumnarValue, Documentation, HigherOrderFunctionArgs,
HigherOrderReturnFieldArgs,
@@ -34,7 +32,12 @@ use datafusion_expr::{
Volatility,
};
use datafusion_macros::user_doc;
-use std::{fmt::Debug, sync::Arc};
+use std::sync::Arc;
+
+use crate::lambda_utils::{
+ ListValuesResult, coerce_single_list_arg, extract_list_values,
+ single_list_lambda_parameters,
+};
make_higher_order_function_expr_and_func!(
ArrayTransform,
@@ -100,30 +103,7 @@ impl HigherOrderUDF for ArrayTransform {
}
fn coerce_value_types(&self, arg_types: &[DataType]) ->
Result<Vec<DataType>> {
- let [list] = arg_types else {
- return plan_err!(
- "{} function requires 1 value argument, got {}",
- self.name(),
- arg_types.len()
- );
- };
-
- let coerced = match list {
- DataType::List(_) | DataType::LargeList(_) => list.clone(),
- DataType::ListView(field) | DataType::FixedSizeList(field, _) => {
- DataType::List(Arc::clone(field))
- }
- DataType::LargeListView(field) =>
DataType::LargeList(Arc::clone(field)),
- _ => {
- return plan_err!(
- "{} expected a list as first argument, got {}",
- self.name(),
- list
- );
- }
- };
-
- Ok(vec![coerced])
+ coerce_single_list_arg(self.name(), arg_types)
}
fn lambda_parameters(
@@ -131,22 +111,7 @@ impl HigherOrderUDF for ArrayTransform {
_step: usize,
fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
) -> Result<LambdaParametersProgress> {
- let [list, _] = take_function_args(self.name(), fields)?;
- let ValueOrLambda::Value(list) = list else {
- return plan_err!("{} expects a value as first argument",
self.name());
- };
-
- let field = match list.data_type() {
- DataType::List(field) => field,
- DataType::LargeList(field) => field,
- _ => return plan_err!("expected list, got {list}"),
- };
-
- // we don't need to check whether the lambda contains more than two
parameters,
- // e.g. array_transform([], (v, i, j) -> v+i+j), as datafusion will do
that for us
- Ok(LambdaParametersProgress::Complete(vec![vec![Arc::clone(
- field,
- )]]))
+ single_list_lambda_parameters(self.name(), fields)
}
fn return_field_from_args(
@@ -187,31 +152,10 @@ impl HigherOrderUDF for ArrayTransform {
let list_array = list.to_array(args.number_rows)?;
- // Fast path for fully null input array
- if list_array.null_count() == list_array.len() {
- return Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
- args.return_type(),
- )?));
- }
-
- // as per list_values docs, if list_array is sliced, list_values will
be sliced too,
- // so before constructing the transformed array below, we must adjust
the list offsets with
- // adjust_offsets_for_slice
- let list_values = list_values(&list_array)?;
-
- // fast path: when every sublist is empty and non-null we can return a
scalar of an non-null empty sublist.
- // If every sublist is null have already been handled above
- if list_values.is_empty()
- && list_array.null_count() == 0
- && matches!(
- args.return_type(),
- DataType::List(_) | DataType::LargeList(_)
- )
- {
- return Ok(ColumnarValue::Scalar(ScalarValue::new_default(
- args.return_type(),
- )?));
- }
+ let list_values = match extract_list_values(&list_array,
args.return_type())? {
+ ListValuesResult::EarlyReturn(v) => return Ok(v),
+ ListValuesResult::Values(v) => v,
+ };
// by passing closures, lambda.evaluate can evaluate only those
actually needed
let values_param = || Ok(Arc::clone(&list_values));
@@ -279,60 +223,23 @@ impl HigherOrderUDF for ArrayTransform {
#[cfg(test)]
mod tests {
- use std::{collections::HashMap, sync::Arc};
-
use arrow::{
- array::{Array, ArrayRef, AsArray, Int32Array, ListArray, RecordBatch},
+ array::{Array, AsArray},
buffer::{NullBuffer, OffsetBuffer},
- datatypes::{DataType, Field},
};
- use datafusion_common::{DFSchema, Result};
- use datafusion_expr::{
- Expr, col, execution_props::ExecutionProps, expr::HigherOrderFunction,
lambda,
- lambda_var, lit,
- };
- use datafusion_physical_expr::create_physical_expr;
use crate::array_transform::array_transform_higher_order_function;
-
- fn create_i32_list(
- values: impl Into<Int32Array>,
- offsets: OffsetBuffer<i32>,
- nulls: Option<NullBuffer>,
- ) -> ListArray {
- let list_field = Arc::new(Field::new_list_field(DataType::Int32,
true));
-
- ListArray::new(list_field, offsets, Arc::new(values.into()), nulls)
- }
-
- fn divide_100_by(list: impl Array + Clone + 'static) -> Result<ArrayRef> {
- let array_transform = array_transform_higher_order_function();
-
- let schema = DFSchema::from_unqualified_fields(
- vec![Field::new(
- "list",
- list.data_type().clone(),
- list.is_nullable(),
- )]
- .into(),
- HashMap::new(),
- )?;
-
- create_physical_expr(
- &Expr::HigherOrderFunction(HigherOrderFunction::new(
- array_transform,
- vec![col("list"), lambda(["v"], lit(100i32) /
lambda_var("v"))],
- ))
- .resolve_lambda_variables(&schema)?
- .data,
- &schema,
- &ExecutionProps::new(),
- )?
- .evaluate(&RecordBatch::try_new(
- Arc::clone(schema.inner()),
- vec![Arc::new(list.clone())],
- )?)?
- .into_array(list.len())
+ use crate::lambda_utils::test_utils::{create_i32_list,
eval_hof_on_i32_list, v};
+ use datafusion_expr::lit;
+
+ fn divide_100_by(
+ list: impl Array + Clone + 'static,
+ ) -> datafusion_common::Result<arrow::array::ArrayRef> {
+ eval_hof_on_i32_list(
+ array_transform_higher_order_function(),
+ list,
+ lit(100i32) / v(),
+ )
}
#[test]
diff --git a/datafusion/functions-nested/src/lambda_utils.rs
b/datafusion/functions-nested/src/lambda_utils.rs
new file mode 100644
index 0000000000..cb8682d4bd
--- /dev/null
+++ b/datafusion/functions-nested/src/lambda_utils.rs
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared utilities for `(array, lambda)` style higher-order functions.
+
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, FieldRef};
+use datafusion_common::{
+ Result, ScalarValue, plan_err,
+ utils::{list_values, take_function_args},
+};
+use datafusion_expr::{ColumnarValue, LambdaParametersProgress, ValueOrLambda};
+use std::sync::Arc;
+
+/// Extracts a `(value, lambda)` pair from a [`ValueOrLambda`] slice.
+pub(crate) fn value_lambda_pair<'a, V: std::fmt::Debug, L: std::fmt::Debug>(
+ name: &str,
+ args: &'a [ValueOrLambda<V, L>],
+) -> Result<(&'a V, &'a L)> {
+ let [value, lambda] = take_function_args(name, args)?;
+
+ let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value,
lambda)
+ else {
+ return plan_err!(
+ "{name} expects a value followed by a lambda, got {value:?} and
{lambda:?}"
+ );
+ };
+
+ Ok((value, lambda))
+}
+
+/// Coerces a single list argument for `(array, lambda)` style higher-order
functions.
+///
+/// Normalises `ListView`/`FixedSizeList` → `List` and `LargeListView` →
`LargeList`.
+pub(crate) fn coerce_single_list_arg(
+ name: &str,
+ arg_types: &[DataType],
+) -> Result<Vec<DataType>> {
+ let list = if arg_types.len() == 1 {
+ &arg_types[0]
+ } else {
+ return plan_err!(
+ "{name} function requires 1 value arguments, got {}",
+ arg_types.len()
+ );
+ };
+
+ let coerced = match list {
+ DataType::List(_) | DataType::LargeList(_) => list.clone(),
+ DataType::ListView(field) | DataType::FixedSizeList(field, _) => {
+ DataType::List(Arc::clone(field))
+ }
+ DataType::LargeListView(field) =>
DataType::LargeList(Arc::clone(field)),
+ _ => return plan_err!("{name} expected a list as first argument, got
{list}"),
+ };
+
+ Ok(vec![coerced])
+}
+
+/// Returns the single lambda parameter set for `(array, v -> body)` style
HOFs.
+pub(crate) fn single_list_lambda_parameters(
+ name: &str,
+ fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
+) -> Result<LambdaParametersProgress> {
+ let (list, _lambda) = value_lambda_pair(name, fields)?;
+
+ let field = match list.data_type() {
+ DataType::List(field) | DataType::LargeList(field) => field,
+ _ => return plan_err!("expected list, got {list}"),
+ };
+
+ Ok(LambdaParametersProgress::Complete(vec![vec![Arc::clone(
+ field,
+ )]]))
+}
+
+/// Result of extracting flat list values, with fast-path short-circuits
handled.
+pub(crate) enum ListValuesResult {
+ /// Caller should return this value immediately.
+ EarlyReturn(ColumnarValue),
+ /// Flat values extracted from the list; continue with execution.
+ Values(ArrayRef),
+}
+
+/// Extracts flat list values, handling all fast-path short-circuits.
+///
+/// - All-null input → `EarlyReturn(null scalar)`
+/// - All sublists empty and non-null → `EarlyReturn(default empty-list
scalar)`
+/// - Otherwise → `Values(flat_values)`
+pub(crate) fn extract_list_values(
+ list_array: &ArrayRef,
+ return_type: &DataType,
+) -> Result<ListValuesResult> {
+ if list_array.null_count() == list_array.len() {
+ return Ok(ListValuesResult::EarlyReturn(ColumnarValue::Scalar(
+ ScalarValue::try_new_null(return_type)?,
+ )));
+ }
+
+ let values = list_values(list_array)?;
+
+ if values.is_empty()
+ && list_array.null_count() == 0
+ && matches!(return_type, DataType::List(_) | DataType::LargeList(_))
+ {
+ return Ok(ListValuesResult::EarlyReturn(ColumnarValue::Scalar(
+ ScalarValue::new_default(return_type)?,
+ )));
+ }
+
+ Ok(ListValuesResult::Values(values))
+}
+
+#[cfg(test)]
+pub(crate) mod test_utils {
+ use std::{collections::HashMap, sync::Arc};
+
+ use arrow::{
+ array::{Array, ArrayRef, Int32Array, ListArray, RecordBatch},
+ buffer::{NullBuffer, OffsetBuffer},
+ datatypes::{DataType, Field},
+ };
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::{
+ Expr, HigherOrderUDF, col,
+ execution_props::ExecutionProps,
+ expr::{HigherOrderFunction, LambdaVariable},
+ lambda,
+ };
+ use datafusion_physical_expr::create_physical_expr;
+
+ pub(crate) fn create_i32_list(
+ values: impl Into<Int32Array>,
+ offsets: OffsetBuffer<i32>,
+ nulls: Option<NullBuffer>,
+ ) -> ListArray {
+ let list_field = Arc::new(Field::new_list_field(DataType::Int32,
true));
+ ListArray::new(list_field, offsets, Arc::new(values.into()), nulls)
+ }
+
+ pub(crate) fn eval_hof_on_i32_list(
+ func: Arc<dyn HigherOrderUDF>,
+ list: impl Array + Clone + 'static,
+ lambda_body: Expr,
+ ) -> Result<ArrayRef> {
+ let schema = DFSchema::from_unqualified_fields(
+ vec![Field::new(
+ "list",
+ list.data_type().clone(),
+ list.is_nullable(),
+ )]
+ .into(),
+ HashMap::new(),
+ )?;
+
+ create_physical_expr(
+ &Expr::HigherOrderFunction(HigherOrderFunction::new(
+ func,
+ vec![col("list"), lambda(["v"], lambda_body)],
+ )),
+ &schema,
+ &ExecutionProps::new(),
+ )?
+ .evaluate(&RecordBatch::try_new(
+ Arc::clone(schema.inner()),
+ vec![Arc::new(list.clone())],
+ )?)?
+ .into_array(list.len())
+ }
+
+ pub(crate) fn v() -> Expr {
+ Expr::LambdaVariable(LambdaVariable::new(
+ "v".to_string(),
+ Some(Arc::new(Field::new("v", DataType::Int32, true))),
+ ))
+ }
+}
diff --git a/datafusion/functions-nested/src/lib.rs
b/datafusion/functions-nested/src/lib.rs
index 6c4556902b..1e6dc68cb2 100644
--- a/datafusion/functions-nested/src/lib.rs
+++ b/datafusion/functions-nested/src/lib.rs
@@ -41,7 +41,10 @@ pub mod macros;
pub mod macros_lambda;
pub mod array_any_match;
+pub(crate) mod lambda_utils;
+
pub mod array_compact;
+pub mod array_filter;
pub mod array_has;
pub mod array_normalize;
pub mod array_transform;
@@ -88,6 +91,7 @@ use std::sync::Arc;
pub mod expr_fn {
pub use super::array_any_match::array_any_match;
pub use super::array_compact::array_compact;
+ pub use super::array_filter::array_filter;
pub use super::array_has::array_has;
pub use super::array_has::array_has_all;
pub use super::array_has::array_has_any;
@@ -200,6 +204,7 @@ pub fn all_default_nested_functions() ->
Vec<Arc<ScalarUDF>> {
pub fn all_default_higher_order_functions() -> Vec<Arc<dyn HigherOrderUDF>> {
vec![
array_any_match::array_any_match_higher_order_function(),
+ array_filter::array_filter_higher_order_function(),
array_transform::array_transform_higher_order_function(),
]
}
diff --git a/datafusion/sqllogictest/test_files/array/array_filter.slt
b/datafusion/sqllogictest/test_files/array/array_filter.slt
new file mode 100644
index 0000000000..f22cfb2198
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/array/array_filter.slt
@@ -0,0 +1,217 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#############
+## array_filter Tests
+#############
+
+statement ok
+set datafusion.sql_parser.dialect = databricks;
+
+statement ok
+CREATE TABLE t (list array<int>, number int)
+AS VALUES
+([1, 50], 10),
+([4, 50], 40),
+([7, 50], 60);
+
+statement ok
+CREATE TABLE with_null_list (list array<int>)
+AS VALUES
+([1, 2]),
+(NULL);
+
+statement ok
+CREATE TABLE fully_null_list (list array<int>)
+AS VALUES
+(NULL),
+(NULL);
+
+query ?
+SELECT array_filter([1, 2, 3, 4, 5], v -> v > 2);
+----
+[3, 4, 5]
+
+# alias list_filter works
+query ?
+SELECT list_filter([1, 2, 3, 4, 5], v -> v > 2);
+----
+[3, 4, 5]
+
+# multiple sublists — t.list rows are [1,50], [4,50], [7,50]
+query ?
+SELECT array_filter(list, v -> v > 40) from t;
+----
+[50]
+[50]
+[50]
+
+# filter with column capture in predicate
+query ?
+SELECT array_filter(list, v -> v > t.number) from t;
+----
+[50]
+[50]
+[]
+
+# null sublist is preserved
+query ?
+SELECT array_filter(list, v -> v > 1) from with_null_list;
+----
+[2]
+NULL
+
+# all null fast path
+query ?
+SELECT array_filter(list, v -> v > 1) from fully_null_list;
+----
+NULL
+NULL
+
+# empty array input
+query ?
+SELECT array_filter([], v -> v > 1);
+----
+[]
+
+# scalar true: return list unchanged
+query ?
+SELECT array_filter([1, 2, 3], v -> true);
+----
+[1, 2, 3]
+
+# scalar false: return empty sublists
+query ?
+SELECT array_filter([1, 2, 3], v -> false);
+----
+[]
+
+# all filtered out
+query ?
+SELECT array_filter([1, 2], v -> v > 10);
+----
+[]
+
+# nothing filtered — all elements pass predicate
+query ?
+SELECT array_filter([3, 4, 5], v -> v > 2);
+----
+[3, 4, 5]
+
+# coercion: ListView input is coerced to List during planning
+query ?
+SELECT array_filter(arrow_cast(list, 'ListView(Int32)'), v -> v > 2) from t;
+----
+[50]
+[4, 50]
+[7, 50]
+
+# null array argument returns null
+query ?
+SELECT array_filter(arrow_cast(NULL, 'List(Int32)'), v -> v > 0);
+----
+NULL
+
+# lambda returns null for some elements — null treated as false (element
dropped)
+query ?
+SELECT array_filter([1, 2, 3], v -> CASE WHEN v = 2 THEN NULL ELSE v > 1 END);
+----
+[3]
+
+# lambda always returns null — scalar null short-circuit returns empty list
+query ?
+SELECT array_filter([1, 2, 3], v -> CAST(NULL AS BOOLEAN));
+----
+[]
+
+query error DataFusion error: Error during planning: The function
'array_filter' expected 2 argument\(s\) but received 0
+SELECT array_filter();
+
+query error DataFusion error: Error during planning: array_filter expected a
list as first argument, got Int64
+SELECT array_filter(1, v -> v > 0);
+
+query error DataFusion error: Error during planning: The function
'array_filter' expected a value at position 0 but received a lambda
+SELECT array_filter(v -> v > 0, [1, 2, 3]);
+
+##############
+## array_filter + array_transform combinations
+##############
+
+# filter then transform on literal
+query ?
+SELECT array_transform(array_filter([1, 2, 3, 4, 5], v -> v > 2), v -> v * 10);
+----
+[30, 40, 50]
+
+# transform then filter on literal
+query ?
+SELECT array_filter(array_transform([1, 2, 3, 4, 5], v -> v * 2), v -> v > 4);
+----
+[6, 8, 10]
+
+# filter evens then square
+query ?
+SELECT array_transform(array_filter([1, 2, 3, 4, 5, 6], v -> v % 2 = 0), v ->
v * v);
+----
+[4, 16, 36]
+
+# double filter
+query ?
+SELECT array_filter(array_filter([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], v -> v % 2 =
0), v -> v > 5);
+----
+[6, 8, 10]
+
+# double transform
+query ?
+SELECT array_transform(array_transform([1, 2, 3], v -> v + 1), v -> v * 2);
+----
+[4, 6, 8]
+
+# filter then transform on table column
+query ?
+SELECT array_transform(array_filter(list, v -> v > 3), v -> v * 2) FROM t
ORDER BY 1;
+----
+[8, 100]
+[14, 100]
+[100]
+
+# transform then filter on table column
+query ?
+SELECT array_filter(array_transform(list, v -> v * 2), v -> v > 10) FROM t
ORDER BY 1;
+----
+[14, 100]
+[100]
+[100]
+
+# filter then transform with null list
+query ?
+SELECT array_transform(array_filter(list, v -> v > 1), v -> v * 3) FROM
with_null_list;
+----
+[6]
+NULL
+
+statement ok
+drop table t;
+
+statement ok
+drop table with_null_list;
+
+statement ok
+drop table fully_null_list;
+
+statement ok
+set datafusion.sql_parser.dialect = generic;
diff --git a/docs/source/user-guide/sql/scalar_functions.md
b/docs/source/user-guide/sql/scalar_functions.md
index 70216e3e9b..6bf61391eb 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -3258,6 +3258,7 @@ _Alias of [current_date](#current_date)._
- [array_empty](#array_empty)
- [array_except](#array_except)
- [array_extract](#array_extract)
+- [array_filter](#array_filter)
- [array_has](#array_has)
- [array_has_all](#array_has_all)
- [array_has_any](#array_has_any)
@@ -3313,6 +3314,7 @@ _Alias of [current_date](#current_date)._
- [list_empty](#list_empty)
- [list_except](#list_except)
- [list_extract](#list_extract)
+- [list_filter](#list_filter)
- [list_has](#list_has)
- [list_has_all](#list_has_all)
- [list_has_any](#list_has_any)
@@ -3660,6 +3662,34 @@ array_except(array1, array2)
_Alias of [array_element](#array_element)._
+### `array_filter`
+
+filters the values of an array using a boolean lambda
+
+```sql
+array_filter(array, x -> x > 2)
+```
+
+#### Arguments
+
+- **array**: Array expression. Can be a constant, column, or function, and any
combination of array operators.
+- **lambda**: Lambda that returns a boolean. Elements for which the lambda
returns true are kept.
+
+#### Example
+
+```sql
+> select array_filter([1, 2, 3, 4, 5], x -> x > 2);
++--------------------------------------------+
+| array_filter([1, 2, 3, 4, 5], x -> x > 2) |
++--------------------------------------------+
+| [3, 4, 5] |
++--------------------------------------------+
+```
+
+#### Aliases
+
+- list_filter
+
### `array_has`
Returns true if the array contains the element.
@@ -4771,6 +4801,10 @@ _Alias of [array_except](#array_except)._
_Alias of [array_element](#array_element)._
+### `list_filter`
+
+_Alias of [array_filter](#array_filter)._
+
### `list_has`
_Alias of [array_has](#array_has)._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]