This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 30c4fd77de Port `ArrayDistinct` to `functions-array` subcrate (#9549)
30c4fd77de is described below
commit 30c4fd77de4318695a7f0faf9dd1dab6716b5c8e
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Tue Mar 12 01:59:32 2024 -0700
Port `ArrayDistinct` to `functions-array` subcrate (#9549)
* Issue-9545 - Port ArrayDistinct to function-arrays subcrate
* Issue-9545 - Add test coverage on roundtrip_logical_plan
* Issue-9545 - Address review comments
---
datafusion/expr/src/built_in_function.rs | 6 --
datafusion/expr/src/expr_fn.rs | 6 --
datafusion/functions-array/src/kernels.rs | 70 +++++++++++++++++++++-
datafusion/functions-array/src/lib.rs | 2 +
datafusion/functions-array/src/udf.rs | 64 ++++++++++++++++++++
datafusion/physical-expr/src/array_expressions.rs | 26 --------
datafusion/physical-expr/src/functions.rs | 3 -
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/generated/pbjson.rs | 3 -
datafusion/proto/src/generated/prost.rs | 4 +-
datafusion/proto/src/logical_plan/from_proto.rs | 16 ++---
datafusion/proto/src/logical_plan/to_proto.rs | 1 -
.../proto/tests/cases/roundtrip_logical_plan.rs | 1 +
docs/source/user-guide/sql/scalar_functions.md | 30 ++++++++++
14 files changed, 174 insertions(+), 60 deletions(-)
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index ca216b151a..41a71711e2 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -114,8 +114,6 @@ pub enum BuiltinScalarFunction {
ArrayPopFront,
/// array_pop_back
ArrayPopBack,
- /// array_distinct
- ArrayDistinct,
/// array_element
ArrayElement,
/// array_position
@@ -325,7 +323,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Tan => Volatility::Immutable,
BuiltinScalarFunction::Tanh => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
- BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
BuiltinScalarFunction::ArrayPopFront => Volatility::Immutable,
@@ -416,7 +413,6 @@ impl BuiltinScalarFunction {
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match self {
- BuiltinScalarFunction::ArrayDistinct =>
Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
List(field)
| LargeList(field)
@@ -658,7 +654,6 @@ impl BuiltinScalarFunction {
Signature::array_and_index(self.volatility())
}
BuiltinScalarFunction::ArrayExcept => Signature::any(2,
self.volatility()),
- BuiltinScalarFunction::ArrayDistinct =>
Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayPosition => {
Signature::array_and_element_and_optional_index(self.volatility())
}
@@ -1073,7 +1068,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::SHA256 => &["sha256"],
BuiltinScalarFunction::SHA384 => &["sha384"],
BuiltinScalarFunction::SHA512 => &["sha512"],
- BuiltinScalarFunction::ArrayDistinct => &["array_distinct",
"list_distinct"],
BuiltinScalarFunction::ArrayElement => &[
"array_element",
"array_extract",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 354816eb5f..fc094ffaa0 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -612,12 +612,6 @@ scalar_expr!(
first_array second_array,
"Returns an array of the elements that appear in the first array but not
in the second."
);
-scalar_expr!(
- ArrayDistinct,
- array_distinct,
- array,
- "return distinct values from the array after removing duplicates."
-);
scalar_expr!(
ArrayPosition,
array_position,
diff --git a/datafusion/functions-array/src/kernels.rs
b/datafusion/functions-array/src/kernels.rs
index 00d4294e83..e5231aa594 100644
--- a/datafusion/functions-array/src/kernels.rs
+++ b/datafusion/functions-array/src/kernels.rs
@@ -28,14 +28,20 @@ use arrow::compute;
use arrow::datatypes::Field;
use arrow::datatypes::UInt64Type;
use arrow::datatypes::{DataType, Date32Type, IntervalMonthDayNanoType};
+use arrow::row::{RowConverter, SortField};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
+use arrow_schema::FieldRef;
use arrow_schema::SortOptions;
+
use datafusion_common::cast::{
as_date32_array, as_generic_list_array, as_generic_string_array,
as_int64_array,
as_interval_mdn_array, as_large_list_array, as_list_array, as_null_array,
as_string_array,
};
-use datafusion_common::{exec_err, not_impl_datafusion_err, DataFusionError,
Result};
+use datafusion_common::{
+ exec_err, internal_err, not_impl_datafusion_err, DataFusionError, Result,
+};
+use itertools::Itertools;
use std::any::type_name;
use std::sync::Arc;
@@ -865,3 +871,65 @@ pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}
}
+
+/// array_distinct SQL function
+/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
+pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 1 {
+ return exec_err!("array_distinct needs one argument");
+ }
+
+ // handle null
+ if args[0].data_type() == &DataType::Null {
+ return Ok(args[0].clone());
+ }
+
+ // handle for list & largelist
+ match args[0].data_type() {
+ DataType::List(field) => {
+ let array = as_list_array(&args[0])?;
+ general_array_distinct(array, field)
+ }
+ DataType::LargeList(field) => {
+ let array = as_large_list_array(&args[0])?;
+ general_array_distinct(array, field)
+ }
+ array_type => exec_err!("array_distinct does not support type
'{array_type:?}'"),
+ }
+}
+
+pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
+ array: &GenericListArray<OffsetSize>,
+ field: &FieldRef,
+) -> Result<ArrayRef> {
+ let dt = array.value_type();
+ let mut offsets = Vec::with_capacity(array.len());
+ offsets.push(OffsetSize::usize_as(0));
+ let mut new_arrays = Vec::with_capacity(array.len());
+ let converter = RowConverter::new(vec![SortField::new(dt)])?;
+ // distinct for each list in ListArray
+ for arr in array.iter().flatten() {
+ let values = converter.convert_columns(&[arr])?;
+ // sort elements in list and remove duplicates
+ let rows = values.iter().sorted().dedup().collect::<Vec<_>>();
+ let last_offset: OffsetSize = offsets.last().copied().unwrap();
+ offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
+ let arrays = converter.convert_rows(rows)?;
+ let array = match arrays.first() {
+ Some(array) => array.clone(),
+ None => {
+ return internal_err!("array_distinct: failed to get array from
rows")
+ }
+ };
+ new_arrays.push(array);
+ }
+ let offsets = OffsetBuffer::new(offsets.into());
+ let new_arrays_ref = new_arrays.iter().map(|v|
v.as_ref()).collect::<Vec<_>>();
+ let values = compute::concat(&new_arrays_ref)?;
+ Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
+ field.clone(),
+ offsets,
+ values,
+ None,
+ )?))
+}
diff --git a/datafusion/functions-array/src/lib.rs
b/datafusion/functions-array/src/lib.rs
index 479b5c64e6..95143570cc 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -51,6 +51,7 @@ pub mod expr_fn {
pub use super::concat::array_prepend;
pub use super::make_array::make_array;
pub use super::udf::array_dims;
+ pub use super::udf::array_distinct;
pub use super::udf::array_empty;
pub use super::udf::array_length;
pub use super::udf::array_ndims;
@@ -84,6 +85,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) ->
Result<()> {
udf::array_length_udf(),
udf::flatten_udf(),
udf::array_sort_udf(),
+ udf::array_distinct_udf(),
];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
diff --git a/datafusion/functions-array/src/udf.rs
b/datafusion/functions-array/src/udf.rs
index edd104e862..02cacf86b0 100644
--- a/datafusion/functions-array/src/udf.rs
+++ b/datafusion/functions-array/src/udf.rs
@@ -709,3 +709,67 @@ impl ScalarUDFImpl for Flatten {
&self.aliases
}
}
+
+make_udf_function!(
+ ArrayDistinct,
+ array_distinct,
+ array,
+ "return distinct values from the array after removing duplicates.",
+ array_distinct_udf
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayDistinct {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl crate::udf::ArrayDistinct {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::array(Volatility::Immutable),
+ aliases: vec!["array_distinct".to_string(),
"list_distinct".to_string()],
+ }
+ }
+}
+
+impl ScalarUDFImpl for crate::udf::ArrayDistinct {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "array_distinct"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ use DataType::*;
+ match &arg_types[0] {
+ List(field) | FixedSizeList(field, _) =>
Ok(List(Arc::new(Field::new(
+ "item",
+ field.data_type().clone(),
+ true,
+ )))),
+ LargeList(field) => Ok(LargeList(Arc::new(Field::new(
+ "item",
+ field.data_type().clone(),
+ true,
+ )))),
+ _ => exec_err!(
+ "Not reachable, data_type should be List, LargeList or
FixedSizeList"
+ ),
+ }
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let args = ColumnarValue::values_to_arrays(args)?;
+ crate::kernels::array_distinct(&args).map(ColumnarValue::Array)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 67136a280d..29ef9d10fa 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -1539,32 +1539,6 @@ pub fn general_array_distinct<OffsetSize:
OffsetSizeTrait>(
)?))
}
-/// array_distinct SQL function
-/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
-pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.len() != 1 {
- return exec_err!("array_distinct needs one argument");
- }
-
- // handle null
- if args[0].data_type() == &DataType::Null {
- return Ok(args[0].clone());
- }
-
- // handle for list & largelist
- match args[0].data_type() {
- DataType::List(field) => {
- let array = as_list_array(&args[0])?;
- general_array_distinct(array, field)
- }
- DataType::LargeList(field) => {
- let array = as_large_list_array(&args[0])?;
- general_array_distinct(array, field)
- }
- array_type => exec_err!("array_distinct does not support type
'{array_type:?}'"),
- }
-}
-
/// array_resize SQL function
pub fn array_resize(arg: &[ArrayRef]) -> Result<ArrayRef> {
if arg.len() < 2 || arg.len() > 3 {
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index e0f0af2a09..2bce52bf78 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -302,9 +302,6 @@ pub fn create_physical_fun(
}
// array functions
- BuiltinScalarFunction::ArrayDistinct => Arc::new(|args| {
- make_scalar_function_inner(array_expressions::array_distinct)(args)
- }),
BuiltinScalarFunction::ArrayElement => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_element)(args)
}),
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index cbd1a16648..44a916af81 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -676,7 +676,7 @@ enum ScalarFunction {
SubstrIndex = 126;
FindInSet = 127;
/// 128 was ArraySort
- ArrayDistinct = 129;
+ /// 129 was ArrayDistinct
ArrayResize = 130;
EndsWith = 131;
/// 132 was InStr
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 3f7b97940c..08489673d5 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22191,7 +22191,6 @@ impl serde::Serialize for ScalarFunction {
Self::Levenshtein => "Levenshtein",
Self::SubstrIndex => "SubstrIndex",
Self::FindInSet => "FindInSet",
- Self::ArrayDistinct => "ArrayDistinct",
Self::ArrayResize => "ArrayResize",
Self::EndsWith => "EndsWith",
Self::MakeDate => "MakeDate",
@@ -22302,7 +22301,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Levenshtein",
"SubstrIndex",
"FindInSet",
- "ArrayDistinct",
"ArrayResize",
"EndsWith",
"MakeDate",
@@ -22442,7 +22440,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Levenshtein" => Ok(ScalarFunction::Levenshtein),
"SubstrIndex" => Ok(ScalarFunction::SubstrIndex),
"FindInSet" => Ok(ScalarFunction::FindInSet),
- "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct),
"ArrayResize" => Ok(ScalarFunction::ArrayResize),
"EndsWith" => Ok(ScalarFunction::EndsWith),
"MakeDate" => Ok(ScalarFunction::MakeDate),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 73de2ee270..a8c9d3c8f6 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2748,7 +2748,7 @@ pub enum ScalarFunction {
SubstrIndex = 126,
FindInSet = 127,
/// / 128 was ArraySort
- ArrayDistinct = 129,
+ /// / 129 was ArrayDistinct
ArrayResize = 130,
EndsWith = 131,
/// / 132 was InStr
@@ -2861,7 +2861,6 @@ impl ScalarFunction {
ScalarFunction::Levenshtein => "Levenshtein",
ScalarFunction::SubstrIndex => "SubstrIndex",
ScalarFunction::FindInSet => "FindInSet",
- ScalarFunction::ArrayDistinct => "ArrayDistinct",
ScalarFunction::ArrayResize => "ArrayResize",
ScalarFunction::EndsWith => "EndsWith",
ScalarFunction::MakeDate => "MakeDate",
@@ -2966,7 +2965,6 @@ impl ScalarFunction {
"Levenshtein" => Some(Self::Levenshtein),
"SubstrIndex" => Some(Self::SubstrIndex),
"FindInSet" => Some(Self::FindInSet),
- "ArrayDistinct" => Some(Self::ArrayDistinct),
"ArrayResize" => Some(Self::ArrayResize),
"EndsWith" => Some(Self::EndsWith),
"MakeDate" => Some(Self::MakeDate),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 46ead2b6bd..131ac78847 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -47,12 +47,12 @@ use datafusion_common::{
use datafusion_expr::expr::Unnest;
use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
- acosh, array_distinct, array_element, array_except, array_intersect,
array_pop_back,
- array_pop_front, array_position, array_positions, array_remove,
array_remove_all,
- array_remove_n, array_repeat, array_replace, array_replace_all,
array_replace_n,
- array_resize, array_slice, array_union, ascii, asinh, atan, atan2, atanh,
bit_length,
- btrim, cbrt, ceil, character_length, chr, coalesce, concat_expr,
concat_ws_expr, cos,
- cosh, cot, current_date, current_time, degrees, digest, ends_with, exp,
+ acosh, array_element, array_except, array_intersect, array_pop_back,
array_pop_front,
+ array_position, array_positions, array_remove, array_remove_all,
array_remove_n,
+ array_repeat, array_replace, array_replace_all, array_replace_n,
array_resize,
+ array_slice, array_union, ascii, asinh, atan, atan2, atanh, bit_length,
btrim, cbrt,
+ ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos,
cosh, cot,
+ current_date, current_time, degrees, digest, ends_with, exp,
expr::{self, InList, Sort, WindowFunction},
factorial, find_in_set, floor, from_unixtime, gcd, initcap, iszero, lcm,
left,
levenshtein, ln, log, log10, log2,
@@ -475,7 +475,6 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::Ltrim => Self::Ltrim,
ScalarFunction::Rtrim => Self::Rtrim,
ScalarFunction::ArrayExcept => Self::ArrayExcept,
- ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
ScalarFunction::ArrayElement => Self::ArrayElement,
ScalarFunction::ArrayPopFront => Self::ArrayPopFront,
ScalarFunction::ArrayPopBack => Self::ArrayPopBack,
@@ -1463,9 +1462,6 @@ pub fn parse_expr(
parse_expr(&args[2], registry, codec)?,
parse_expr(&args[3], registry, codec)?,
)),
- ScalarFunction::ArrayDistinct => {
- Ok(array_distinct(parse_expr(&args[0], registry, codec)?))
- }
ScalarFunction::ArrayElement => Ok(array_element(
parse_expr(&args[0], registry, codec)?,
parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 1e09923b86..6d0d81f61b 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1457,7 +1457,6 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::Rtrim => Self::Rtrim,
BuiltinScalarFunction::ToChar => Self::ToChar,
BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept,
- BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
BuiltinScalarFunction::ArrayPopFront => Self::ArrayPopFront,
BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index fcbe573723..faea202da0 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -613,6 +613,7 @@ async fn roundtrip_expr_api() -> Result<()> {
lit("desc"),
lit("NULLS LAST"),
),
+ array_distinct(make_array(vec![lit(1), lit(3), lit(3), lit(2),
lit(2)])),
];
// ensure expressions created with the expr api can be round tripped
diff --git a/docs/source/user-guide/sql/scalar_functions.md
b/docs/source/user-guide/sql/scalar_functions.md
index e77ae41018..420de5f3fd 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -1949,6 +1949,7 @@ from_unixtime(expression)
- [array_concat](#array_concat)
- [array_contains](#array_contains)
- [array_dims](#array_dims)
+- [array_distinct](#array_distinct)
- [array_has](#array_has)
- [array_has_all](#array_has_all)
- [array_has_any](#array_has_any)
@@ -1987,6 +1988,7 @@ from_unixtime(expression)
- [list_cat](#list_cat)
- [list_concat](#list_concat)
- [list_dims](#list_dims)
+- [list_distinct](#list_distinct)
- [list_element](#list_element)
- [list_extract](#list_extract)
- [list_has](#list_has)
@@ -2204,6 +2206,34 @@ array_dims(array)
- list_dims
+### `array_distinct`
+
+Returns distinct values from the array after removing duplicates.
+
+```
+array_distinct(array)
+```
+
+#### Arguments
+
+- **array**: Array expression.
+ Can be a constant, column, or function, and any combination of array
operators.
+
+#### Example
+
+```
+❯ select array_distinct([1, 3, 2, 3, 1, 2, 4]);
++---------------------------------+
+| array_distinct(List([1,2,3,4])) |
++---------------------------------+
+| [1, 2, 3, 4] |
++---------------------------------+
+```
+
+#### Aliases
+
+- list_distinct
+
### `array_element`
Extracts the element with the index n from the array.