This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0ed369e925 Handle one-element array return value in ScalarFunctionExpr
(#12965)
0ed369e925 is described below
commit 0ed369e925ae8856e36b166bfcea8601019c6967
Author: Georgi Krastev <[email protected]>
AuthorDate: Thu Oct 17 20:24:07 2024 +0300
Handle one-element array return value in ScalarFunctionExpr (#12965)
This was done in #12922 only for math functions.
We now generalize this fallback to all scalar UDFs.
---
datafusion/expr-common/src/columnar_value.rs | 11 -----------
datafusion/functions/src/macros.rs | 12 ++++++------
datafusion/physical-expr/src/scalar_function.rs | 18 +++++++++++++++---
3 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/datafusion/expr-common/src/columnar_value.rs
b/datafusion/expr-common/src/columnar_value.rs
index 1ee90eb4b4..57056d0806 100644
--- a/datafusion/expr-common/src/columnar_value.rs
+++ b/datafusion/expr-common/src/columnar_value.rs
@@ -217,17 +217,6 @@ impl ColumnarValue {
}
}
}
-
- /// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied
arguments.
- /// This is useful for scalar UDF implementations to fulfil their contract:
- /// if all arguments are scalar values, the result should also be a scalar
value.
- pub fn from_args_and_result(args: &[Self], result: ArrayRef) ->
Result<Self> {
- if result.len() == 1 && args.iter().all(|arg| matches!(arg,
Self::Scalar(_))) {
- Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?))
- } else {
- Ok(Self::Array(result))
- }
- }
}
#[cfg(test)]
diff --git a/datafusion/functions/src/macros.rs
b/datafusion/functions/src/macros.rs
index 85ffaa868f..744a018912 100644
--- a/datafusion/functions/src/macros.rs
+++ b/datafusion/functions/src/macros.rs
@@ -228,8 +228,8 @@ macro_rules! make_math_unary_udf {
$EVALUATE_BOUNDS(inputs)
}
- fn invoke(&self, col_args: &[ColumnarValue]) ->
Result<ColumnarValue> {
- let args = ColumnarValue::values_to_arrays(col_args)?;
+ fn invoke(&self, args: &[ColumnarValue]) ->
Result<ColumnarValue> {
+ let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
@@ -257,7 +257,7 @@ macro_rules! make_math_unary_udf {
}
};
- ColumnarValue::from_args_and_result(col_args, arr)
+ Ok(ColumnarValue::Array(arr))
}
fn documentation(&self) -> Option<&Documentation> {
@@ -344,8 +344,8 @@ macro_rules! make_math_binary_udf {
$OUTPUT_ORDERING(input)
}
- fn invoke(&self, col_args: &[ColumnarValue]) ->
Result<ColumnarValue> {
- let args = ColumnarValue::values_to_arrays(col_args)?;
+ fn invoke(&self, args: &[ColumnarValue]) ->
Result<ColumnarValue> {
+ let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
@@ -372,7 +372,7 @@ macro_rules! make_math_binary_udf {
}
};
- ColumnarValue::from_args_and_result(col_args, arr)
+ Ok(ColumnarValue::Array(arr))
}
fn documentation(&self) -> Option<&Documentation> {
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index 130c335d1c..4d3db96ceb 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -39,7 +39,8 @@ use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use datafusion_common::{internal_err, DFSchema, Result};
+use arrow_array::Array;
+use datafusion_common::{internal_err, DFSchema, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
@@ -147,8 +148,19 @@ impl PhysicalExpr for ScalarFunctionExpr {
if let ColumnarValue::Array(array) = &output {
if array.len() != batch.num_rows() {
- return internal_err!("UDF returned a different number of rows
than expected. Expected: {}, Got: {}",
- batch.num_rows(), array.len());
+ // If the arguments are a non-empty slice of scalar values, we
can assume that
+ // returning a one-element array is equivalent to returning a
scalar.
+ let preserve_scalar = array.len() == 1
+ && !inputs.is_empty()
+ && inputs
+ .iter()
+ .all(|arg| matches!(arg, ColumnarValue::Scalar(_)));
+ return if preserve_scalar {
+ ScalarValue::try_from_array(array,
0).map(ColumnarValue::Scalar)
+ } else {
+ internal_err!("UDF returned a different number of rows
than expected. Expected: {}, Got: {}",
+ batch.num_rows(), array.len())
+ };
}
}
Ok(output)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]