This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dfb6435e16 Add `ColumnarValue::values_to_arrays`, deprecate
`columnar_values_to_array` (#9114)
dfb6435e16 is described below
commit dfb6435e16cf4cfd5245c84dd6e18fcf96ac72f2
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Feb 5 09:09:10 2024 -0500
Add `ColumnarValue::values_to_arrays`, deprecate `columnar_values_to_array`
(#9114)
* Add `ColumnarValue::values_to_array`
* Apply suggestions from code review
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---------
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
datafusion-examples/examples/simple_udf.rs | 7 +-
datafusion/expr/src/columnar_value.rs | 164 ++++++++++++++++++++-
.../src/simplify_expressions/expr_simplifier.rs | 3 +-
datafusion/physical-expr/src/functions.rs | 44 +-----
4 files changed, 170 insertions(+), 48 deletions(-)
diff --git a/datafusion-examples/examples/simple_udf.rs
b/datafusion-examples/examples/simple_udf.rs
index dda6ba62e0..64cf7857e2 100644
--- a/datafusion-examples/examples/simple_udf.rs
+++ b/datafusion-examples/examples/simple_udf.rs
@@ -28,7 +28,6 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::cast::as_float64_array;
use datafusion_expr::ColumnarValue;
-use datafusion_physical_expr::functions::columnar_values_to_array;
use std::sync::Arc;
/// create local execution context with an in-memory table:
@@ -71,13 +70,15 @@ async fn main() -> Result<()> {
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 2);
- let args = columnar_values_to_array(args)?;
+ // Expand the arguments to arrays (this is simple, but inefficient for
+ // single constant values).
+ let args = ColumnarValue::values_to_arrays(args)?;
// 1. cast both arguments to f64. These casts MUST be aligned with the
signature or this function panics!
let base = as_float64_array(&args[0]).expect("cast failed");
let exponent = as_float64_array(&args[1]).expect("cast failed");
- // this is guaranteed by DataFusion. We place it just to make it
obvious.
+ // The array lengths is guaranteed by DataFusion. We assert here to
make it obvious.
assert_eq!(exponent.len(), base.len());
// 2. perform the computation
diff --git a/datafusion/expr/src/columnar_value.rs
b/datafusion/expr/src/columnar_value.rs
index 58c534b50a..585bee3b9b 100644
--- a/datafusion/expr/src/columnar_value.rs
+++ b/datafusion/expr/src/columnar_value.rs
@@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use std::sync::Arc;
/// Represents the result of evaluating an expression: either a single
@@ -75,4 +75,166 @@ impl ColumnarValue {
pub fn create_null_array(num_rows: usize) -> Self {
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
+
+ /// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
+ ///
+ /// # Performance Note
+ ///
+ /// This function expands any [`ScalarValue`] to an array. This expansion
+ /// permits using a single function in terms of arrays, but it can be
+ /// inefficient compared to handling the scalar value directly.
+ ///
+ /// Thus, it is recommended to provide specialized implementations for
+ /// scalar values if performance is a concern.
+ ///
+ /// # Errors
+ ///
+ /// If there are multiple array arguments that have different lengths
+ pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
+ if args.is_empty() {
+ return Ok(vec![]);
+ }
+
+ let mut array_len = None;
+ for arg in args {
+ array_len = match (arg, array_len) {
+ (ColumnarValue::Array(a), None) => Some(a.len()),
+ (ColumnarValue::Array(a), Some(array_len)) => {
+ if array_len == a.len() {
+ Some(array_len)
+ } else {
+ return internal_err!(
+ "Arguments has mixed length. Expected length:
{array_len}, found length: {}", a.len()
+ );
+ }
+ }
+ (ColumnarValue::Scalar(_), array_len) => array_len,
+ }
+ }
+
+ // If array_len is none, it means there are only scalars, so make a 1
element array
+ let inferred_length = array_len.unwrap_or(1);
+
+ let args = args
+ .iter()
+ .map(|arg| arg.clone().into_array(inferred_length))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(args)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn values_to_arrays() {
+ // (input, expected)
+ let cases = vec![
+ // empty
+ TestCase {
+ input: vec![],
+ expected: vec![],
+ },
+ // one array of length 3
+ TestCase {
+ input: vec![ColumnarValue::Array(make_array(1, 3))],
+ expected: vec![make_array(1, 3)],
+ },
+ // two arrays length 3
+ TestCase {
+ input: vec![
+ ColumnarValue::Array(make_array(1, 3)),
+ ColumnarValue::Array(make_array(2, 3)),
+ ],
+ expected: vec![make_array(1, 3), make_array(2, 3)],
+ },
+ // array and scalar
+ TestCase {
+ input: vec![
+ ColumnarValue::Array(make_array(1, 3)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
+ ],
+ expected: vec![
+ make_array(1, 3),
+ make_array(100, 3), // scalar is expanded
+ ],
+ },
+ // scalar and array
+ TestCase {
+ input: vec![
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
+ ColumnarValue::Array(make_array(1, 3)),
+ ],
+ expected: vec![
+ make_array(100, 3), // scalar is expanded
+ make_array(1, 3),
+ ],
+ },
+ // multiple scalars and array
+ TestCase {
+ input: vec![
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
+ ColumnarValue::Array(make_array(1, 3)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
+ ],
+ expected: vec![
+ make_array(100, 3), // scalar is expanded
+ make_array(1, 3),
+ make_array(200, 3), // scalar is expanded
+ ],
+ },
+ ];
+ for case in cases {
+ case.run();
+ }
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "Arguments has mixed length. Expected length: 3, found
length: 4"
+ )]
+ fn values_to_arrays_mixed_length() {
+ ColumnarValue::values_to_arrays(&[
+ ColumnarValue::Array(make_array(1, 3)),
+ ColumnarValue::Array(make_array(2, 4)),
+ ])
+ .unwrap();
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "Arguments has mixed length. Expected length: 3, found
length: 7"
+ )]
+ fn values_to_arrays_mixed_length_and_scalar() {
+ ColumnarValue::values_to_arrays(&[
+ ColumnarValue::Array(make_array(1, 3)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
+ ColumnarValue::Array(make_array(2, 7)),
+ ])
+ .unwrap();
+ }
+
+ struct TestCase {
+ input: Vec<ColumnarValue>,
+ expected: Vec<ArrayRef>,
+ }
+
+ impl TestCase {
+ fn run(self) {
+ let Self { input, expected } = self;
+
+ assert_eq!(
+ ColumnarValue::values_to_arrays(&input).unwrap(),
+ expected,
+ "\ninput: {input:?}\nexpected: {expected:?}"
+ );
+ }
+ }
+
+ /// Makes an array of length `len` with all elements set to `val`
+ fn make_array(val: i32, len: usize) -> ArrayRef {
+ Arc::new(arrow::array::Int32Array::from(vec![val; len]))
+ }
}
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 87d81abc4b..ab62cf8646 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -1347,7 +1347,6 @@ mod tests {
use datafusion_physical_expr::execution_props::ExecutionProps;
use chrono::{DateTime, TimeZone, Utc};
- use datafusion_physical_expr::functions::columnar_values_to_array;
// ------------------------------
// --- ExprSimplifier tests -----
@@ -1461,7 +1460,7 @@ mod tests {
let return_type = Arc::new(DataType::Int32);
let fun = Arc::new(|args: &[ColumnarValue]| {
- let args = columnar_values_to_array(args)?;
+ let args = ColumnarValue::values_to_arrays(args)?;
let arg0 = as_int32_array(&args[0])?;
let arg1 = as_int32_array(&args[1])?;
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 21eaeab721..cbd780a8fb 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -173,49 +173,9 @@ pub(crate) enum Hint {
AcceptsSingular,
}
-/// A helper function used to infer the length of arguments of Scalar
functions and convert
-/// [`ColumnarValue`]s to [`ArrayRef`]s with the inferred length. Note that
this function
-/// only works for functions that accept either that all arguments are scalars
or all arguments
-/// are arrays with same length. Otherwise, it will return an error.
+#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays
instead")]
pub fn columnar_values_to_array(args: &[ColumnarValue]) ->
Result<Vec<ArrayRef>> {
- if args.is_empty() {
- return Ok(vec![]);
- }
-
- let len = args
- .iter()
- .fold(Option::<usize>::None, |acc, arg| match arg {
- ColumnarValue::Scalar(_) if acc.is_none() => Some(1),
- ColumnarValue::Scalar(_) => {
- if let Some(1) = acc {
- acc
- } else {
- None
- }
- }
- ColumnarValue::Array(a) => {
- if let Some(l) = acc {
- if l == a.len() {
- acc
- } else {
- None
- }
- } else {
- Some(a.len())
- }
- }
- });
-
- let inferred_length = len.ok_or(DataFusionError::Internal(
- "Arguments has mixed length".to_string(),
- ))?;
-
- let args = args
- .iter()
- .map(|arg| arg.clone().into_array(inferred_length))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(args)
+ ColumnarValue::values_to_arrays(args)
}
/// Decorates a function to handle [`ScalarValue`]s by converting them to
arrays before calling the function