This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 916d4dbcf7 Issue-9767 - Extract array_dims, array_ndims and flatten
functions from functions-array subcrate' s kernels and udf containers (#9786)
916d4dbcf7 is described below
commit 916d4dbcf7e9d70d722b8fc662aef738f61b1409
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sun Mar 24 15:37:57 2024 -0700
Issue-9767 - Extract array_dims, array_ndims and flatten functions from
functions-array subcrate' s kernels and udf containers (#9786)
---
.../functions-array/src/{udf.rs => dimension.rs} | 137 +++++++--------
datafusion/functions-array/src/except.rs | 2 +-
.../functions-array/src/{kernels.rs => flatten.rs} | 195 +++++++++++----------
datafusion/functions-array/src/lib.rs | 16 +-
4 files changed, 175 insertions(+), 175 deletions(-)
diff --git a/datafusion/functions-array/src/udf.rs
b/datafusion/functions-array/src/dimension.rs
similarity index 56%
rename from datafusion/functions-array/src/udf.rs
rename to datafusion/functions-array/src/dimension.rs
index bdc11155b6..569eff66f7 100644
--- a/datafusion/functions-array/src/udf.rs
+++ b/datafusion/functions-array/src/dimension.rs
@@ -15,17 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-//! [`ScalarUDFImpl`] definitions for array functions.
+//! [`ScalarUDFImpl`] definitions for array_dims and array_ndims functions.
-use arrow::datatypes::DataType;
-use arrow::datatypes::Field;
-use datafusion_common::exec_err;
-use datafusion_common::plan_err;
-use datafusion_common::Result;
-use datafusion_expr::expr::ScalarFunction;
-use datafusion_expr::Expr;
-use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use arrow::array::{
+ Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array,
+};
+use arrow::datatypes::{DataType, UInt64Type};
use std::any::Any;
+
+use datafusion_common::cast::{as_large_list_array, as_list_array};
+use datafusion_common::{exec_err, plan_err, Result};
+
+use crate::utils::{compute_array_dims, make_scalar_function};
+use arrow_schema::DataType::{FixedSizeList, LargeList, List, UInt64};
+use arrow_schema::Field;
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature,
Volatility};
use std::sync::Arc;
make_udf_function!(
@@ -64,7 +69,6 @@ impl ScalarUDFImpl for ArrayDims {
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- use DataType::*;
Ok(match arg_types[0] {
List(_) | LargeList(_) | FixedSizeList(_, _) => {
List(Arc::new(Field::new("item", UInt64, true)))
@@ -76,8 +80,7 @@ impl ScalarUDFImpl for ArrayDims {
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let args = ColumnarValue::values_to_arrays(args)?;
- crate::kernels::array_dims(&args).map(ColumnarValue::Array)
+ make_scalar_function(array_dims_inner)(args)
}
fn aliases(&self) -> &[String] {
@@ -120,7 +123,6 @@ impl ScalarUDFImpl for ArrayNdims {
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- use DataType::*;
Ok(match arg_types[0] {
List(_) | LargeList(_) | FixedSizeList(_, _) => UInt64,
_ => {
@@ -130,8 +132,7 @@ impl ScalarUDFImpl for ArrayNdims {
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let args = ColumnarValue::values_to_arrays(args)?;
- crate::kernels::array_ndims(&args).map(ColumnarValue::Array)
+ make_scalar_function(array_ndims_inner)(args)
}
fn aliases(&self) -> &[String] {
@@ -139,70 +140,68 @@ impl ScalarUDFImpl for ArrayNdims {
}
}
-make_udf_function!(
- Flatten,
- flatten,
- array,
- "flattens an array of arrays into a single array.",
- flatten_udf
-);
+/// Array_dims SQL function
+pub fn array_dims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 1 {
+ return exec_err!("array_dims needs one argument");
+ }
-#[derive(Debug)]
-pub(super) struct Flatten {
- signature: Signature,
- aliases: Vec<String>,
-}
-impl Flatten {
- pub fn new() -> Self {
- Self {
- signature: Signature::array(Volatility::Immutable),
- aliases: vec![String::from("flatten")],
+ let data = match args[0].data_type() {
+ List(_) => {
+ let array = as_list_array(&args[0])?;
+ array
+ .iter()
+ .map(compute_array_dims)
+ .collect::<Result<Vec<_>>>()?
}
- }
+ LargeList(_) => {
+ let array = as_large_list_array(&args[0])?;
+ array
+ .iter()
+ .map(compute_array_dims)
+ .collect::<Result<Vec<_>>>()?
+ }
+ array_type => {
+ return exec_err!("array_dims does not support type
'{array_type:?}'");
+ }
+ };
+
+ let result = ListArray::from_iter_primitive::<UInt64Type, _, _>(data);
+
+ Ok(Arc::new(result) as ArrayRef)
}
-impl ScalarUDFImpl for Flatten {
- fn as_any(&self) -> &dyn Any {
- self
- }
- fn name(&self) -> &str {
- "flatten"
+/// Array_ndims SQL function
+pub fn array_ndims_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 1 {
+ return exec_err!("array_ndims needs one argument");
}
- fn signature(&self) -> &Signature {
- &self.signature
- }
+ fn general_list_ndims<O: OffsetSizeTrait>(
+ array: &GenericListArray<O>,
+ ) -> Result<ArrayRef> {
+ let mut data = Vec::new();
+ let ndims = datafusion_common::utils::list_ndims(array.data_type());
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- use DataType::*;
- fn get_base_type(data_type: &DataType) -> Result<DataType> {
- match data_type {
- List(field) | FixedSizeList(field, _)
- if matches!(field.data_type(), List(_) | FixedSizeList(_,
_)) =>
- {
- get_base_type(field.data_type())
- }
- LargeList(field) if matches!(field.data_type(), LargeList(_))
=> {
- get_base_type(field.data_type())
- }
- Null | List(_) | LargeList(_) => Ok(data_type.to_owned()),
- FixedSizeList(field, _) => Ok(List(field.clone())),
- _ => exec_err!(
- "Not reachable, data_type should be List, LargeList or
FixedSizeList"
- ),
+ for arr in array.iter() {
+ if arr.is_some() {
+ data.push(Some(ndims))
+ } else {
+ data.push(None)
}
}
- let data_type = get_base_type(&arg_types[0])?;
- Ok(data_type)
+ Ok(Arc::new(UInt64Array::from(data)) as ArrayRef)
}
-
- fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let args = ColumnarValue::values_to_arrays(args)?;
- crate::kernels::flatten(&args).map(ColumnarValue::Array)
- }
-
- fn aliases(&self) -> &[String] {
- &self.aliases
+ match args[0].data_type() {
+ List(_) => {
+ let array = as_list_array(&args[0])?;
+ general_list_ndims::<i32>(array)
+ }
+ LargeList(_) => {
+ let array = as_large_list_array(&args[0])?;
+ general_list_ndims::<i64>(array)
+ }
+ array_type => exec_err!("array_ndims does not support type
{array_type:?}"),
}
}
diff --git a/datafusion/functions-array/src/except.rs
b/datafusion/functions-array/src/except.rs
index 1faaf80e69..72932d530a 100644
--- a/datafusion/functions-array/src/except.rs
+++ b/datafusion/functions-array/src/except.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! implementation kernel for array_except function
+//! [`ScalarUDFImpl`] definitions for array_except function.
use crate::utils::check_datatypes;
use arrow::row::{RowConverter, SortField};
diff --git a/datafusion/functions-array/src/kernels.rs
b/datafusion/functions-array/src/flatten.rs
similarity index 55%
rename from datafusion/functions-array/src/kernels.rs
rename to datafusion/functions-array/src/flatten.rs
index 1a08b64197..27d4b1d5f9 100644
--- a/datafusion/functions-array/src/kernels.rs
+++ b/datafusion/functions-array/src/flatten.rs
@@ -15,111 +15,124 @@
// specific language governing permissions and limitations
// under the License.
-//! implementation kernels for array functions
+//! [`ScalarUDFImpl`] definitions for flatten function.
-use arrow::array::{
- Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array,
-};
-use arrow::datatypes::{DataType, UInt64Type};
+use crate::utils::make_scalar_function;
+use arrow_array::{ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow_buffer::OffsetBuffer;
-
+use arrow_schema::DataType;
+use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null};
use datafusion_common::cast::{
as_generic_list_array, as_large_list_array, as_list_array,
};
-use datafusion_common::{exec_err, Result};
-
-use crate::utils::compute_array_dims;
+use datafusion_common::exec_err;
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature,
Volatility};
+use std::any::Any;
use std::sync::Arc;
-/// Array_dims SQL function
-pub fn array_dims(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.len() != 1 {
- return exec_err!("array_dims needs one argument");
+make_udf_function!(
+ Flatten,
+ flatten,
+ array,
+ "flattens an array of arrays into a single array.",
+ flatten_udf
+);
+
+#[derive(Debug)]
+pub(super) struct Flatten {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+impl Flatten {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::array(Volatility::Immutable),
+ aliases: vec![String::from("flatten")],
+ }
}
+}
- let data = match args[0].data_type() {
- DataType::List(_) => {
- let array = as_list_array(&args[0])?;
- array
- .iter()
- .map(compute_array_dims)
- .collect::<Result<Vec<_>>>()?
- }
- DataType::LargeList(_) => {
- let array = as_large_list_array(&args[0])?;
- array
- .iter()
- .map(compute_array_dims)
- .collect::<Result<Vec<_>>>()?
- }
- array_type => {
- return exec_err!("array_dims does not support type
'{array_type:?}'");
+impl ScalarUDFImpl for Flatten {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "flatten"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) ->
datafusion_common::Result<DataType> {
+ fn get_base_type(data_type: &DataType) ->
datafusion_common::Result<DataType> {
+ match data_type {
+ List(field) | FixedSizeList(field, _)
+ if matches!(field.data_type(), List(_) | FixedSizeList(_,
_)) =>
+ {
+ get_base_type(field.data_type())
+ }
+ LargeList(field) if matches!(field.data_type(), LargeList(_))
=> {
+ get_base_type(field.data_type())
+ }
+ Null | List(_) | LargeList(_) => Ok(data_type.to_owned()),
+ FixedSizeList(field, _) => Ok(List(field.clone())),
+ _ => exec_err!(
+ "Not reachable, data_type should be List, LargeList or
FixedSizeList"
+ ),
+ }
}
- };
- let result = ListArray::from_iter_primitive::<UInt64Type, _, _>(data);
+ let data_type = get_base_type(&arg_types[0])?;
+ Ok(data_type)
+ }
- Ok(Arc::new(result) as ArrayRef)
+ fn invoke(&self, args: &[ColumnarValue]) ->
datafusion_common::Result<ColumnarValue> {
+ make_scalar_function(flatten_inner)(args)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
}
-/// Array_ndims SQL function
-pub fn array_ndims(args: &[ArrayRef]) -> Result<ArrayRef> {
+/// Flatten SQL function
+pub fn flatten_inner(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef>
{
if args.len() != 1 {
- return exec_err!("array_ndims needs one argument");
+ return exec_err!("flatten expects one argument");
}
- fn general_list_ndims<O: OffsetSizeTrait>(
- array: &GenericListArray<O>,
- ) -> Result<ArrayRef> {
- let mut data = Vec::new();
- let ndims = datafusion_common::utils::list_ndims(array.data_type());
-
- for arr in array.iter() {
- if arr.is_some() {
- data.push(Some(ndims))
- } else {
- data.push(None)
- }
+ let array_type = args[0].data_type();
+ match array_type {
+ List(_) => {
+ let list_arr = as_list_array(&args[0])?;
+ let flattened_array = flatten_internal::<i32>(list_arr.clone(),
None)?;
+ Ok(Arc::new(flattened_array) as ArrayRef)
}
-
- Ok(Arc::new(UInt64Array::from(data)) as ArrayRef)
- }
- match args[0].data_type() {
- DataType::List(_) => {
- let array = as_list_array(&args[0])?;
- general_list_ndims::<i32>(array)
+ LargeList(_) => {
+ let list_arr = as_large_list_array(&args[0])?;
+ let flattened_array = flatten_internal::<i64>(list_arr.clone(),
None)?;
+ Ok(Arc::new(flattened_array) as ArrayRef)
}
- DataType::LargeList(_) => {
- let array = as_large_list_array(&args[0])?;
- general_list_ndims::<i64>(array)
+ Null => Ok(args[0].clone()),
+ _ => {
+ exec_err!("flatten does not support type '{array_type:?}'")
}
- array_type => exec_err!("array_ndims does not support type
{array_type:?}"),
}
}
-// Create new offsets that are euqiavlent to `flatten` the array.
-fn get_offsets_for_flatten<O: OffsetSizeTrait>(
- offsets: OffsetBuffer<O>,
- indexes: OffsetBuffer<O>,
-) -> OffsetBuffer<O> {
- let buffer = offsets.into_inner();
- let offsets: Vec<O> = indexes
- .iter()
- .map(|i| buffer[i.to_usize().unwrap()])
- .collect();
- OffsetBuffer::new(offsets.into())
-}
-
fn flatten_internal<O: OffsetSizeTrait>(
list_arr: GenericListArray<O>,
indexes: Option<OffsetBuffer<O>>,
-) -> Result<GenericListArray<O>> {
+) -> datafusion_common::Result<GenericListArray<O>> {
let (field, offsets, values, _) = list_arr.clone().into_parts();
let data_type = field.data_type();
match data_type {
// Recursively get the base offsets for flattened array
- DataType::List(_) | DataType::LargeList(_) => {
+ List(_) | LargeList(_) => {
let sub_list = as_generic_list_array::<O>(&values)?;
if let Some(indexes) = indexes {
let offsets = get_offsets_for_flatten(offsets, indexes);
@@ -141,27 +154,15 @@ fn flatten_internal<O: OffsetSizeTrait>(
}
}
-/// Flatten SQL function
-pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.len() != 1 {
- return exec_err!("flatten expects one argument");
- }
-
- let array_type = args[0].data_type();
- match array_type {
- DataType::List(_) => {
- let list_arr = as_list_array(&args[0])?;
- let flattened_array = flatten_internal::<i32>(list_arr.clone(),
None)?;
- Ok(Arc::new(flattened_array) as ArrayRef)
- }
- DataType::LargeList(_) => {
- let list_arr = as_large_list_array(&args[0])?;
- let flattened_array = flatten_internal::<i64>(list_arr.clone(),
None)?;
- Ok(Arc::new(flattened_array) as ArrayRef)
- }
- DataType::Null => Ok(args[0].clone()),
- _ => {
- exec_err!("flatten does not support type '{array_type:?}'")
- }
- }
+// Create new offsets that are equivalent to `flatten` the array.
+fn get_offsets_for_flatten<O: OffsetSizeTrait>(
+ offsets: OffsetBuffer<O>,
+ indexes: OffsetBuffer<O>,
+) -> OffsetBuffer<O> {
+ let buffer = offsets.into_inner();
+ let offsets: Vec<O> = indexes
+ .iter()
+ .map(|i| buffer[i.to_usize().unwrap()])
+ .collect();
+ OffsetBuffer::new(offsets.into())
}
diff --git a/datafusion/functions-array/src/lib.rs
b/datafusion/functions-array/src/lib.rs
index feecd18c2e..30a63deee0 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -32,10 +32,11 @@ mod array_has;
mod cardinality;
mod concat;
mod core;
+mod dimension;
mod empty;
mod except;
mod extract;
-mod kernels;
+mod flatten;
mod length;
mod position;
mod range;
@@ -48,7 +49,6 @@ mod rewrite;
mod set_ops;
mod sort;
mod string;
-mod udf;
mod utils;
use datafusion_common::Result;
@@ -67,12 +67,15 @@ pub mod expr_fn {
pub use super::concat::array_concat;
pub use super::concat::array_prepend;
pub use super::core::make_array;
+ pub use super::dimension::array_dims;
+ pub use super::dimension::array_ndims;
pub use super::empty::array_empty;
pub use super::except::array_except;
pub use super::extract::array_element;
pub use super::extract::array_pop_back;
pub use super::extract::array_pop_front;
pub use super::extract::array_slice;
+ pub use super::flatten::flatten;
pub use super::length::array_length;
pub use super::position::array_position;
pub use super::position::array_positions;
@@ -93,9 +96,6 @@ pub mod expr_fn {
pub use super::sort::array_sort;
pub use super::string::array_to_string;
pub use super::string::string_to_array;
- pub use super::udf::array_dims;
- pub use super::udf::array_ndims;
- pub use super::udf::flatten;
}
/// Registers all enabled packages with a [`FunctionRegistry`]
@@ -105,9 +105,9 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) ->
Result<()> {
string::string_to_array_udf(),
range::range_udf(),
range::gen_series_udf(),
- udf::array_dims_udf(),
+ dimension::array_dims_udf(),
cardinality::cardinality_udf(),
- udf::array_ndims_udf(),
+ dimension::array_ndims_udf(),
concat::array_append_udf(),
concat::array_prepend_udf(),
concat::array_concat_udf(),
@@ -122,7 +122,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) ->
Result<()> {
array_has::array_has_any_udf(),
empty::array_empty_udf(),
length::array_length_udf(),
- udf::flatten_udf(),
+ flatten::flatten_udf(),
sort::array_sort_udf(),
repeat::array_repeat_udf(),
resize::array_resize_udf(),