This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 916d4dbcf7 Issue-9767 - Extract array_dims, array_ndims and flatten 
functions from functions-array subcrate' s kernels and udf containers (#9786)
916d4dbcf7 is described below

commit 916d4dbcf7e9d70d722b8fc662aef738f61b1409
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Mar 24 15:37:57 2024 -0700

    Issue-9767 - Extract array_dims, array_ndims and flatten functions from 
functions-array subcrate' s kernels and udf containers (#9786)
---
 .../functions-array/src/{udf.rs => dimension.rs}   | 137 +++++++--------
 datafusion/functions-array/src/except.rs           |   2 +-
 .../functions-array/src/{kernels.rs => flatten.rs} | 195 +++++++++++----------
 datafusion/functions-array/src/lib.rs              |  16 +-
 4 files changed, 175 insertions(+), 175 deletions(-)

diff --git a/datafusion/functions-array/src/udf.rs 
b/datafusion/functions-array/src/dimension.rs
similarity index 56%
rename from datafusion/functions-array/src/udf.rs
rename to datafusion/functions-array/src/dimension.rs
index bdc11155b6..569eff66f7 100644
--- a/datafusion/functions-array/src/udf.rs
+++ b/datafusion/functions-array/src/dimension.rs
@@ -15,17 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`ScalarUDFImpl`] definitions for array functions.
+//! [`ScalarUDFImpl`] definitions for array_dims and array_ndims functions.
 
-use arrow::datatypes::DataType;
-use arrow::datatypes::Field;
-use datafusion_common::exec_err;
-use datafusion_common::plan_err;
-use datafusion_common::Result;
-use datafusion_expr::expr::ScalarFunction;
-use datafusion_expr::Expr;
-use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use arrow::array::{
+    Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array,
+};
+use arrow::datatypes::{DataType, UInt64Type};
 use std::any::Any;
+
+use datafusion_common::cast::{as_large_list_array, as_list_array};
+use datafusion_common::{exec_err, plan_err, Result};
+
+use crate::utils::{compute_array_dims, make_scalar_function};
+use arrow_schema::DataType::{FixedSizeList, LargeList, List, UInt64};
+use arrow_schema::Field;
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, 
Volatility};
 use std::sync::Arc;
 
 make_udf_function!(
@@ -64,7 +69,6 @@ impl ScalarUDFImpl for ArrayDims {
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        use DataType::*;
         Ok(match arg_types[0] {
             List(_) | LargeList(_) | FixedSizeList(_, _) => {
                 List(Arc::new(Field::new("item", UInt64, true)))
@@ -76,8 +80,7 @@ impl ScalarUDFImpl for ArrayDims {
     }
 
     fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        let args = ColumnarValue::values_to_arrays(args)?;
-        crate::kernels::array_dims(&args).map(ColumnarValue::Array)
+        make_scalar_function(array_dims_inner)(args)
     }
 
     fn aliases(&self) -> &[String] {
@@ -120,7 +123,6 @@ impl ScalarUDFImpl for ArrayNdims {
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        use DataType::*;
         Ok(match arg_types[0] {
             List(_) | LargeList(_) | FixedSizeList(_, _) => UInt64,
             _ => {
@@ -130,8 +132,7 @@ impl ScalarUDFImpl for ArrayNdims {
     }
 
     fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        let args = ColumnarValue::values_to_arrays(args)?;
-        crate::kernels::array_ndims(&args).map(ColumnarValue::Array)
+        make_scalar_function(array_ndims_inner)(args)
     }
 
     fn aliases(&self) -> &[String] {
@@ -139,70 +140,68 @@ impl ScalarUDFImpl for ArrayNdims {
     }
 }
 
-make_udf_function!(
-    Flatten,
-    flatten,
-    array,
-    "flattens an array of arrays into a single array.",
-    flatten_udf
-);
+/// Array_dims SQL function
+pub fn array_dims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() != 1 {
+        return exec_err!("array_dims needs one argument");
+    }
 
-#[derive(Debug)]
-pub(super) struct Flatten {
-    signature: Signature,
-    aliases: Vec<String>,
-}
-impl Flatten {
-    pub fn new() -> Self {
-        Self {
-            signature: Signature::array(Volatility::Immutable),
-            aliases: vec![String::from("flatten")],
+    let data = match args[0].data_type() {
+        List(_) => {
+            let array = as_list_array(&args[0])?;
+            array
+                .iter()
+                .map(compute_array_dims)
+                .collect::<Result<Vec<_>>>()?
         }
-    }
+        LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            array
+                .iter()
+                .map(compute_array_dims)
+                .collect::<Result<Vec<_>>>()?
+        }
+        array_type => {
+            return exec_err!("array_dims does not support type 
'{array_type:?}'");
+        }
+    };
+
+    let result = ListArray::from_iter_primitive::<UInt64Type, _, _>(data);
+
+    Ok(Arc::new(result) as ArrayRef)
 }
 
-impl ScalarUDFImpl for Flatten {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-    fn name(&self) -> &str {
-        "flatten"
+/// Array_ndims SQL function
+pub fn array_ndims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() != 1 {
+        return exec_err!("array_ndims needs one argument");
     }
 
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
+    fn general_list_ndims<O: OffsetSizeTrait>(
+        array: &GenericListArray<O>,
+    ) -> Result<ArrayRef> {
+        let mut data = Vec::new();
+        let ndims = datafusion_common::utils::list_ndims(array.data_type());
 
-    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        use DataType::*;
-        fn get_base_type(data_type: &DataType) -> Result<DataType> {
-            match data_type {
-                List(field) | FixedSizeList(field, _)
-                    if matches!(field.data_type(), List(_) | FixedSizeList(_, 
_)) =>
-                {
-                    get_base_type(field.data_type())
-                }
-                LargeList(field) if matches!(field.data_type(), LargeList(_)) 
=> {
-                    get_base_type(field.data_type())
-                }
-                Null | List(_) | LargeList(_) => Ok(data_type.to_owned()),
-                FixedSizeList(field, _) => Ok(List(field.clone())),
-                _ => exec_err!(
-                    "Not reachable, data_type should be List, LargeList or 
FixedSizeList"
-                ),
+        for arr in array.iter() {
+            if arr.is_some() {
+                data.push(Some(ndims))
+            } else {
+                data.push(None)
             }
         }
 
-        let data_type = get_base_type(&arg_types[0])?;
-        Ok(data_type)
+        Ok(Arc::new(UInt64Array::from(data)) as ArrayRef)
     }
-
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        let args = ColumnarValue::values_to_arrays(args)?;
-        crate::kernels::flatten(&args).map(ColumnarValue::Array)
-    }
-
-    fn aliases(&self) -> &[String] {
-        &self.aliases
+    match args[0].data_type() {
+        List(_) => {
+            let array = as_list_array(&args[0])?;
+            general_list_ndims::<i32>(array)
+        }
+        LargeList(_) => {
+            let array = as_large_list_array(&args[0])?;
+            general_list_ndims::<i64>(array)
+        }
+        array_type => exec_err!("array_ndims does not support type 
{array_type:?}"),
     }
 }
diff --git a/datafusion/functions-array/src/except.rs 
b/datafusion/functions-array/src/except.rs
index 1faaf80e69..72932d530a 100644
--- a/datafusion/functions-array/src/except.rs
+++ b/datafusion/functions-array/src/except.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! implementation kernel for array_except function
+//! [`ScalarUDFImpl`] definitions for array_except function.
 
 use crate::utils::check_datatypes;
 use arrow::row::{RowConverter, SortField};
diff --git a/datafusion/functions-array/src/kernels.rs 
b/datafusion/functions-array/src/flatten.rs
similarity index 55%
rename from datafusion/functions-array/src/kernels.rs
rename to datafusion/functions-array/src/flatten.rs
index 1a08b64197..27d4b1d5f9 100644
--- a/datafusion/functions-array/src/kernels.rs
+++ b/datafusion/functions-array/src/flatten.rs
@@ -15,111 +15,124 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! implementation kernels for array functions
+//! [`ScalarUDFImpl`] definitions for flatten function.
 
-use arrow::array::{
-    Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array,
-};
-use arrow::datatypes::{DataType, UInt64Type};
+use crate::utils::make_scalar_function;
+use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait};
 use arrow_buffer::OffsetBuffer;
-
+use arrow_schema::DataType;
+use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null};
 use datafusion_common::cast::{
     as_generic_list_array, as_large_list_array, as_list_array,
 };
-use datafusion_common::{exec_err, Result};
-
-use crate::utils::compute_array_dims;
+use datafusion_common::exec_err;
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, 
Volatility};
+use std::any::Any;
 use std::sync::Arc;
 
-/// Array_dims SQL function
-pub fn array_dims(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 1 {
-        return exec_err!("array_dims needs one argument");
+make_udf_function!(
+    Flatten,
+    flatten,
+    array,
+    "flattens an array of arrays into a single array.",
+    flatten_udf
+);
+
+#[derive(Debug)]
+pub(super) struct Flatten {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+impl Flatten {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::array(Volatility::Immutable),
+            aliases: vec![String::from("flatten")],
+        }
     }
+}
 
-    let data = match args[0].data_type() {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            array
-                .iter()
-                .map(compute_array_dims)
-                .collect::<Result<Vec<_>>>()?
-        }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            array
-                .iter()
-                .map(compute_array_dims)
-                .collect::<Result<Vec<_>>>()?
-        }
-        array_type => {
-            return exec_err!("array_dims does not support type 
'{array_type:?}'");
+impl ScalarUDFImpl for Flatten {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "flatten"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> 
datafusion_common::Result<DataType> {
+        fn get_base_type(data_type: &DataType) -> 
datafusion_common::Result<DataType> {
+            match data_type {
+                List(field) | FixedSizeList(field, _)
+                    if matches!(field.data_type(), List(_) | FixedSizeList(_, 
_)) =>
+                {
+                    get_base_type(field.data_type())
+                }
+                LargeList(field) if matches!(field.data_type(), LargeList(_)) 
=> {
+                    get_base_type(field.data_type())
+                }
+                Null | List(_) | LargeList(_) => Ok(data_type.to_owned()),
+                FixedSizeList(field, _) => Ok(List(field.clone())),
+                _ => exec_err!(
+                    "Not reachable, data_type should be List, LargeList or 
FixedSizeList"
+                ),
+            }
         }
-    };
 
-    let result = ListArray::from_iter_primitive::<UInt64Type, _, _>(data);
+        let data_type = get_base_type(&arg_types[0])?;
+        Ok(data_type)
+    }
 
-    Ok(Arc::new(result) as ArrayRef)
+    fn invoke(&self, args: &[ColumnarValue]) -> 
datafusion_common::Result<ColumnarValue> {
+        make_scalar_function(flatten_inner)(args)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
 }
 
-/// Array_ndims SQL function
-pub fn array_ndims(args: &[ArrayRef]) -> Result<ArrayRef> {
+/// Flatten SQL function
+pub fn flatten_inner(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef> 
{
     if args.len() != 1 {
-        return exec_err!("array_ndims needs one argument");
+        return exec_err!("flatten expects one argument");
     }
 
-    fn general_list_ndims<O: OffsetSizeTrait>(
-        array: &GenericListArray<O>,
-    ) -> Result<ArrayRef> {
-        let mut data = Vec::new();
-        let ndims = datafusion_common::utils::list_ndims(array.data_type());
-
-        for arr in array.iter() {
-            if arr.is_some() {
-                data.push(Some(ndims))
-            } else {
-                data.push(None)
-            }
+    let array_type = args[0].data_type();
+    match array_type {
+        List(_) => {
+            let list_arr = as_list_array(&args[0])?;
+            let flattened_array = flatten_internal::<i32>(list_arr.clone(), 
None)?;
+            Ok(Arc::new(flattened_array) as ArrayRef)
         }
-
-        Ok(Arc::new(UInt64Array::from(data)) as ArrayRef)
-    }
-    match args[0].data_type() {
-        DataType::List(_) => {
-            let array = as_list_array(&args[0])?;
-            general_list_ndims::<i32>(array)
+        LargeList(_) => {
+            let list_arr = as_large_list_array(&args[0])?;
+            let flattened_array = flatten_internal::<i64>(list_arr.clone(), 
None)?;
+            Ok(Arc::new(flattened_array) as ArrayRef)
         }
-        DataType::LargeList(_) => {
-            let array = as_large_list_array(&args[0])?;
-            general_list_ndims::<i64>(array)
+        Null => Ok(args[0].clone()),
+        _ => {
+            exec_err!("flatten does not support type '{array_type:?}'")
         }
-        array_type => exec_err!("array_ndims does not support type 
{array_type:?}"),
     }
 }
 
-// Create new offsets that are euqiavlent to `flatten` the array.
-fn get_offsets_for_flatten<O: OffsetSizeTrait>(
-    offsets: OffsetBuffer<O>,
-    indexes: OffsetBuffer<O>,
-) -> OffsetBuffer<O> {
-    let buffer = offsets.into_inner();
-    let offsets: Vec<O> = indexes
-        .iter()
-        .map(|i| buffer[i.to_usize().unwrap()])
-        .collect();
-    OffsetBuffer::new(offsets.into())
-}
-
 fn flatten_internal<O: OffsetSizeTrait>(
     list_arr: GenericListArray<O>,
     indexes: Option<OffsetBuffer<O>>,
-) -> Result<GenericListArray<O>> {
+) -> datafusion_common::Result<GenericListArray<O>> {
     let (field, offsets, values, _) = list_arr.clone().into_parts();
     let data_type = field.data_type();
 
     match data_type {
         // Recursively get the base offsets for flattened array
-        DataType::List(_) | DataType::LargeList(_) => {
+        List(_) | LargeList(_) => {
             let sub_list = as_generic_list_array::<O>(&values)?;
             if let Some(indexes) = indexes {
                 let offsets = get_offsets_for_flatten(offsets, indexes);
@@ -141,27 +154,15 @@ fn flatten_internal<O: OffsetSizeTrait>(
     }
 }
 
-/// Flatten SQL function
-pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 1 {
-        return exec_err!("flatten expects one argument");
-    }
-
-    let array_type = args[0].data_type();
-    match array_type {
-        DataType::List(_) => {
-            let list_arr = as_list_array(&args[0])?;
-            let flattened_array = flatten_internal::<i32>(list_arr.clone(), 
None)?;
-            Ok(Arc::new(flattened_array) as ArrayRef)
-        }
-        DataType::LargeList(_) => {
-            let list_arr = as_large_list_array(&args[0])?;
-            let flattened_array = flatten_internal::<i64>(list_arr.clone(), 
None)?;
-            Ok(Arc::new(flattened_array) as ArrayRef)
-        }
-        DataType::Null => Ok(args[0].clone()),
-        _ => {
-            exec_err!("flatten does not support type '{array_type:?}'")
-        }
-    }
+// Create new offsets that are equivalent to `flatten` the array.
+fn get_offsets_for_flatten<O: OffsetSizeTrait>(
+    offsets: OffsetBuffer<O>,
+    indexes: OffsetBuffer<O>,
+) -> OffsetBuffer<O> {
+    let buffer = offsets.into_inner();
+    let offsets: Vec<O> = indexes
+        .iter()
+        .map(|i| buffer[i.to_usize().unwrap()])
+        .collect();
+    OffsetBuffer::new(offsets.into())
 }
diff --git a/datafusion/functions-array/src/lib.rs 
b/datafusion/functions-array/src/lib.rs
index feecd18c2e..30a63deee0 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -32,10 +32,11 @@ mod array_has;
 mod cardinality;
 mod concat;
 mod core;
+mod dimension;
 mod empty;
 mod except;
 mod extract;
-mod kernels;
+mod flatten;
 mod length;
 mod position;
 mod range;
@@ -48,7 +49,6 @@ mod rewrite;
 mod set_ops;
 mod sort;
 mod string;
-mod udf;
 mod utils;
 
 use datafusion_common::Result;
@@ -67,12 +67,15 @@ pub mod expr_fn {
     pub use super::concat::array_concat;
     pub use super::concat::array_prepend;
     pub use super::core::make_array;
+    pub use super::dimension::array_dims;
+    pub use super::dimension::array_ndims;
     pub use super::empty::array_empty;
     pub use super::except::array_except;
     pub use super::extract::array_element;
     pub use super::extract::array_pop_back;
     pub use super::extract::array_pop_front;
     pub use super::extract::array_slice;
+    pub use super::flatten::flatten;
     pub use super::length::array_length;
     pub use super::position::array_position;
     pub use super::position::array_positions;
@@ -93,9 +96,6 @@ pub mod expr_fn {
     pub use super::sort::array_sort;
     pub use super::string::array_to_string;
     pub use super::string::string_to_array;
-    pub use super::udf::array_dims;
-    pub use super::udf::array_ndims;
-    pub use super::udf::flatten;
 }
 
 /// Registers all enabled packages with a [`FunctionRegistry`]
@@ -105,9 +105,9 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> 
Result<()> {
         string::string_to_array_udf(),
         range::range_udf(),
         range::gen_series_udf(),
-        udf::array_dims_udf(),
+        dimension::array_dims_udf(),
         cardinality::cardinality_udf(),
-        udf::array_ndims_udf(),
+        dimension::array_ndims_udf(),
         concat::array_append_udf(),
         concat::array_prepend_udf(),
         concat::array_concat_udf(),
@@ -122,7 +122,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> 
Result<()> {
         array_has::array_has_any_udf(),
         empty::array_empty_udf(),
         length::array_length_udf(),
-        udf::flatten_udf(),
+        flatten::flatten_udf(),
         sort::array_sort_udf(),
         repeat::array_repeat_udf(),
         resize::array_resize_udf(),

Reply via email to