This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 30c4fd77de Port `ArrayDistinct` to `functions-array` subcrate (#9549)
30c4fd77de is described below

commit 30c4fd77de4318695a7f0faf9dd1dab6716b5c8e
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Tue Mar 12 01:59:32 2024 -0700

    Port `ArrayDistinct` to `functions-array` subcrate (#9549)
    
    * Issue-9545 - Port ArrayDistinct to function-arrays subcrate
    
    * Issue-9545 - Add test coverage on roundtrip_logical_plan
    
    * Issue-9545 - Address review comments
---
 datafusion/expr/src/built_in_function.rs           |  6 --
 datafusion/expr/src/expr_fn.rs                     |  6 --
 datafusion/functions-array/src/kernels.rs          | 70 +++++++++++++++++++++-
 datafusion/functions-array/src/lib.rs              |  2 +
 datafusion/functions-array/src/udf.rs              | 64 ++++++++++++++++++++
 datafusion/physical-expr/src/array_expressions.rs  | 26 --------
 datafusion/physical-expr/src/functions.rs          |  3 -
 datafusion/proto/proto/datafusion.proto            |  2 +-
 datafusion/proto/src/generated/pbjson.rs           |  3 -
 datafusion/proto/src/generated/prost.rs            |  4 +-
 datafusion/proto/src/logical_plan/from_proto.rs    | 16 ++---
 datafusion/proto/src/logical_plan/to_proto.rs      |  1 -
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  1 +
 docs/source/user-guide/sql/scalar_functions.md     | 30 ++++++++++
 14 files changed, 174 insertions(+), 60 deletions(-)

diff --git a/datafusion/expr/src/built_in_function.rs 
b/datafusion/expr/src/built_in_function.rs
index ca216b151a..41a71711e2 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -114,8 +114,6 @@ pub enum BuiltinScalarFunction {
     ArrayPopFront,
     /// array_pop_back
     ArrayPopBack,
-    /// array_distinct
-    ArrayDistinct,
     /// array_element
     ArrayElement,
     /// array_position
@@ -325,7 +323,6 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Tan => Volatility::Immutable,
             BuiltinScalarFunction::Tanh => Volatility::Immutable,
             BuiltinScalarFunction::Trunc => Volatility::Immutable,
-            BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
             BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
             BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
             BuiltinScalarFunction::ArrayPopFront => Volatility::Immutable,
@@ -416,7 +413,6 @@ impl BuiltinScalarFunction {
         // the return type of the built in function.
         // Some built-in functions' return type depends on the incoming type.
         match self {
-            BuiltinScalarFunction::ArrayDistinct => 
Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
                 List(field)
                 | LargeList(field)
@@ -658,7 +654,6 @@ impl BuiltinScalarFunction {
                 Signature::array_and_index(self.volatility())
             }
             BuiltinScalarFunction::ArrayExcept => Signature::any(2, 
self.volatility()),
-            BuiltinScalarFunction::ArrayDistinct => 
Signature::array(self.volatility()),
             BuiltinScalarFunction::ArrayPosition => {
                 
Signature::array_and_element_and_optional_index(self.volatility())
             }
@@ -1073,7 +1068,6 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::SHA256 => &["sha256"],
             BuiltinScalarFunction::SHA384 => &["sha384"],
             BuiltinScalarFunction::SHA512 => &["sha512"],
-            BuiltinScalarFunction::ArrayDistinct => &["array_distinct", 
"list_distinct"],
             BuiltinScalarFunction::ArrayElement => &[
                 "array_element",
                 "array_extract",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 354816eb5f..fc094ffaa0 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -612,12 +612,6 @@ scalar_expr!(
     first_array second_array,
     "Returns an array of the elements that appear in the first array but not 
in the second."
 );
-scalar_expr!(
-    ArrayDistinct,
-    array_distinct,
-    array,
-    "return distinct values from the array after removing duplicates."
-);
 scalar_expr!(
     ArrayPosition,
     array_position,
diff --git a/datafusion/functions-array/src/kernels.rs 
b/datafusion/functions-array/src/kernels.rs
index 00d4294e83..e5231aa594 100644
--- a/datafusion/functions-array/src/kernels.rs
+++ b/datafusion/functions-array/src/kernels.rs
@@ -28,14 +28,20 @@ use arrow::compute;
 use arrow::datatypes::Field;
 use arrow::datatypes::UInt64Type;
 use arrow::datatypes::{DataType, Date32Type, IntervalMonthDayNanoType};
+use arrow::row::{RowConverter, SortField};
 use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
+use arrow_schema::FieldRef;
 use arrow_schema::SortOptions;
+
 use datafusion_common::cast::{
     as_date32_array, as_generic_list_array, as_generic_string_array, 
as_int64_array,
     as_interval_mdn_array, as_large_list_array, as_list_array, as_null_array,
     as_string_array,
 };
-use datafusion_common::{exec_err, not_impl_datafusion_err, DataFusionError, 
Result};
+use datafusion_common::{
+    exec_err, internal_err, not_impl_datafusion_err, DataFusionError, Result,
+};
+use itertools::Itertools;
 use std::any::type_name;
 use std::sync::Arc;
 
@@ -865,3 +871,65 @@ pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
         }
     }
 }
+
+/// array_distinct SQL function
+/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
+pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() != 1 {
+        return exec_err!("array_distinct needs one argument");
+    }
+
+    // handle null
+    if args[0].data_type() == &DataType::Null {
+        return Ok(args[0].clone());
+    }
+
+    // handle for list & largelist
+    match args[0].data_type() {
+        DataType::List(field) => {
+            let array = as_list_array(&args[0])?;
+            general_array_distinct(array, field)
+        }
+        DataType::LargeList(field) => {
+            let array = as_large_list_array(&args[0])?;
+            general_array_distinct(array, field)
+        }
+        array_type => exec_err!("array_distinct does not support type 
'{array_type:?}'"),
+    }
+}
+
+pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
+    array: &GenericListArray<OffsetSize>,
+    field: &FieldRef,
+) -> Result<ArrayRef> {
+    let dt = array.value_type();
+    let mut offsets = Vec::with_capacity(array.len());
+    offsets.push(OffsetSize::usize_as(0));
+    let mut new_arrays = Vec::with_capacity(array.len());
+    let converter = RowConverter::new(vec![SortField::new(dt)])?;
+    // distinct for each list in ListArray
+    for arr in array.iter().flatten() {
+        let values = converter.convert_columns(&[arr])?;
+        // sort elements in list and remove duplicates
+        let rows = values.iter().sorted().dedup().collect::<Vec<_>>();
+        let last_offset: OffsetSize = offsets.last().copied().unwrap();
+        offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
+        let arrays = converter.convert_rows(rows)?;
+        let array = match arrays.first() {
+            Some(array) => array.clone(),
+            None => {
+                return internal_err!("array_distinct: failed to get array from 
rows")
+            }
+        };
+        new_arrays.push(array);
+    }
+    let offsets = OffsetBuffer::new(offsets.into());
+    let new_arrays_ref = new_arrays.iter().map(|v| 
v.as_ref()).collect::<Vec<_>>();
+    let values = compute::concat(&new_arrays_ref)?;
+    Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
+        field.clone(),
+        offsets,
+        values,
+        None,
+    )?))
+}
diff --git a/datafusion/functions-array/src/lib.rs 
b/datafusion/functions-array/src/lib.rs
index 479b5c64e6..95143570cc 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -51,6 +51,7 @@ pub mod expr_fn {
     pub use super::concat::array_prepend;
     pub use super::make_array::make_array;
     pub use super::udf::array_dims;
+    pub use super::udf::array_distinct;
     pub use super::udf::array_empty;
     pub use super::udf::array_length;
     pub use super::udf::array_ndims;
@@ -84,6 +85,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> 
Result<()> {
         udf::array_length_udf(),
         udf::flatten_udf(),
         udf::array_sort_udf(),
+        udf::array_distinct_udf(),
     ];
     functions.into_iter().try_for_each(|udf| {
         let existing_udf = registry.register_udf(udf)?;
diff --git a/datafusion/functions-array/src/udf.rs 
b/datafusion/functions-array/src/udf.rs
index edd104e862..02cacf86b0 100644
--- a/datafusion/functions-array/src/udf.rs
+++ b/datafusion/functions-array/src/udf.rs
@@ -709,3 +709,67 @@ impl ScalarUDFImpl for Flatten {
         &self.aliases
     }
 }
+
+make_udf_function!(
+    ArrayDistinct,
+    array_distinct,
+    array,
+    "return distinct values from the array after removing duplicates.",
+    array_distinct_udf
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayDistinct {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl crate::udf::ArrayDistinct {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::array(Volatility::Immutable),
+            aliases: vec!["array_distinct".to_string(), 
"list_distinct".to_string()],
+        }
+    }
+}
+
+impl ScalarUDFImpl for crate::udf::ArrayDistinct {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "array_distinct"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        use DataType::*;
+        match &arg_types[0] {
+            List(field) | FixedSizeList(field, _) => 
Ok(List(Arc::new(Field::new(
+                "item",
+                field.data_type().clone(),
+                true,
+            )))),
+            LargeList(field) => Ok(LargeList(Arc::new(Field::new(
+                "item",
+                field.data_type().clone(),
+                true,
+            )))),
+            _ => exec_err!(
+                "Not reachable, data_type should be List, LargeList or 
FixedSizeList"
+            ),
+        }
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        let args = ColumnarValue::values_to_arrays(args)?;
+        crate::kernels::array_distinct(&args).map(ColumnarValue::Array)
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
diff --git a/datafusion/physical-expr/src/array_expressions.rs 
b/datafusion/physical-expr/src/array_expressions.rs
index 67136a280d..29ef9d10fa 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -1539,32 +1539,6 @@ pub fn general_array_distinct<OffsetSize: 
OffsetSizeTrait>(
     )?))
 }
 
-/// array_distinct SQL function
-/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
-pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 1 {
-        return exec_err!("array_distinct needs one argument");
-    }
-
-    // handle null
-    if args[0].data_type() == &DataType::Null {
-        return Ok(args[0].clone());
-    }
-
-    // handle for list & largelist
-    match args[0].data_type() {
-        DataType::List(field) => {
-            let array = as_list_array(&args[0])?;
-            general_array_distinct(array, field)
-        }
-        DataType::LargeList(field) => {
-            let array = as_large_list_array(&args[0])?;
-            general_array_distinct(array, field)
-        }
-        array_type => exec_err!("array_distinct does not support type 
'{array_type:?}'"),
-    }
-}
-
 /// array_resize SQL function
 pub fn array_resize(arg: &[ArrayRef]) -> Result<ArrayRef> {
     if arg.len() < 2 || arg.len() > 3 {
diff --git a/datafusion/physical-expr/src/functions.rs 
b/datafusion/physical-expr/src/functions.rs
index e0f0af2a09..2bce52bf78 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -302,9 +302,6 @@ pub fn create_physical_fun(
         }
 
         // array functions
-        BuiltinScalarFunction::ArrayDistinct => Arc::new(|args| {
-            make_scalar_function_inner(array_expressions::array_distinct)(args)
-        }),
         BuiltinScalarFunction::ArrayElement => Arc::new(|args| {
             make_scalar_function_inner(array_expressions::array_element)(args)
         }),
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index cbd1a16648..44a916af81 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -676,7 +676,7 @@ enum ScalarFunction {
   SubstrIndex = 126;
   FindInSet = 127;
   /// 128 was ArraySort
-  ArrayDistinct = 129;
+  /// 129 was ArrayDistinct
   ArrayResize = 130;
   EndsWith = 131;
   /// 132 was InStr
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 3f7b97940c..08489673d5 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22191,7 +22191,6 @@ impl serde::Serialize for ScalarFunction {
             Self::Levenshtein => "Levenshtein",
             Self::SubstrIndex => "SubstrIndex",
             Self::FindInSet => "FindInSet",
-            Self::ArrayDistinct => "ArrayDistinct",
             Self::ArrayResize => "ArrayResize",
             Self::EndsWith => "EndsWith",
             Self::MakeDate => "MakeDate",
@@ -22302,7 +22301,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "Levenshtein",
             "SubstrIndex",
             "FindInSet",
-            "ArrayDistinct",
             "ArrayResize",
             "EndsWith",
             "MakeDate",
@@ -22442,7 +22440,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "Levenshtein" => Ok(ScalarFunction::Levenshtein),
                     "SubstrIndex" => Ok(ScalarFunction::SubstrIndex),
                     "FindInSet" => Ok(ScalarFunction::FindInSet),
-                    "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct),
                     "ArrayResize" => Ok(ScalarFunction::ArrayResize),
                     "EndsWith" => Ok(ScalarFunction::EndsWith),
                     "MakeDate" => Ok(ScalarFunction::MakeDate),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 73de2ee270..a8c9d3c8f6 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2748,7 +2748,7 @@ pub enum ScalarFunction {
     SubstrIndex = 126,
     FindInSet = 127,
     /// / 128 was ArraySort
-    ArrayDistinct = 129,
+    /// / 129 was ArrayDistinct
     ArrayResize = 130,
     EndsWith = 131,
     /// / 132 was InStr
@@ -2861,7 +2861,6 @@ impl ScalarFunction {
             ScalarFunction::Levenshtein => "Levenshtein",
             ScalarFunction::SubstrIndex => "SubstrIndex",
             ScalarFunction::FindInSet => "FindInSet",
-            ScalarFunction::ArrayDistinct => "ArrayDistinct",
             ScalarFunction::ArrayResize => "ArrayResize",
             ScalarFunction::EndsWith => "EndsWith",
             ScalarFunction::MakeDate => "MakeDate",
@@ -2966,7 +2965,6 @@ impl ScalarFunction {
             "Levenshtein" => Some(Self::Levenshtein),
             "SubstrIndex" => Some(Self::SubstrIndex),
             "FindInSet" => Some(Self::FindInSet),
-            "ArrayDistinct" => Some(Self::ArrayDistinct),
             "ArrayResize" => Some(Self::ArrayResize),
             "EndsWith" => Some(Self::EndsWith),
             "MakeDate" => Some(Self::MakeDate),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 46ead2b6bd..131ac78847 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -47,12 +47,12 @@ use datafusion_common::{
 use datafusion_expr::expr::Unnest;
 use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
-    acosh, array_distinct, array_element, array_except, array_intersect, 
array_pop_back,
-    array_pop_front, array_position, array_positions, array_remove, 
array_remove_all,
-    array_remove_n, array_repeat, array_replace, array_replace_all, 
array_replace_n,
-    array_resize, array_slice, array_union, ascii, asinh, atan, atan2, atanh, 
bit_length,
-    btrim, cbrt, ceil, character_length, chr, coalesce, concat_expr, 
concat_ws_expr, cos,
-    cosh, cot, current_date, current_time, degrees, digest, ends_with, exp,
+    acosh, array_element, array_except, array_intersect, array_pop_back, 
array_pop_front,
+    array_position, array_positions, array_remove, array_remove_all, 
array_remove_n,
+    array_repeat, array_replace, array_replace_all, array_replace_n, 
array_resize,
+    array_slice, array_union, ascii, asinh, atan, atan2, atanh, bit_length, 
btrim, cbrt,
+    ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, 
cosh, cot,
+    current_date, current_time, degrees, digest, ends_with, exp,
     expr::{self, InList, Sort, WindowFunction},
     factorial, find_in_set, floor, from_unixtime, gcd, initcap, iszero, lcm, 
left,
     levenshtein, ln, log, log10, log2,
@@ -475,7 +475,6 @@ impl From<&protobuf::ScalarFunction> for 
BuiltinScalarFunction {
             ScalarFunction::Ltrim => Self::Ltrim,
             ScalarFunction::Rtrim => Self::Rtrim,
             ScalarFunction::ArrayExcept => Self::ArrayExcept,
-            ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
             ScalarFunction::ArrayElement => Self::ArrayElement,
             ScalarFunction::ArrayPopFront => Self::ArrayPopFront,
             ScalarFunction::ArrayPopBack => Self::ArrayPopBack,
@@ -1463,9 +1462,6 @@ pub fn parse_expr(
                     parse_expr(&args[2], registry, codec)?,
                     parse_expr(&args[3], registry, codec)?,
                 )),
-                ScalarFunction::ArrayDistinct => {
-                    Ok(array_distinct(parse_expr(&args[0], registry, codec)?))
-                }
                 ScalarFunction::ArrayElement => Ok(array_element(
                     parse_expr(&args[0], registry, codec)?,
                     parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 1e09923b86..6d0d81f61b 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1457,7 +1457,6 @@ impl TryFrom<&BuiltinScalarFunction> for 
protobuf::ScalarFunction {
             BuiltinScalarFunction::Rtrim => Self::Rtrim,
             BuiltinScalarFunction::ToChar => Self::ToChar,
             BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept,
-            BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
             BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
             BuiltinScalarFunction::ArrayPopFront => Self::ArrayPopFront,
             BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index fcbe573723..faea202da0 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -613,6 +613,7 @@ async fn roundtrip_expr_api() -> Result<()> {
             lit("desc"),
             lit("NULLS LAST"),
         ),
+        array_distinct(make_array(vec![lit(1), lit(3), lit(3), lit(2), 
lit(2)])),
     ];
 
     // ensure expressions created with the expr api can be round tripped
diff --git a/docs/source/user-guide/sql/scalar_functions.md 
b/docs/source/user-guide/sql/scalar_functions.md
index e77ae41018..420de5f3fd 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -1949,6 +1949,7 @@ from_unixtime(expression)
 - [array_concat](#array_concat)
 - [array_contains](#array_contains)
 - [array_dims](#array_dims)
+- [array_distinct](#array_distinct)
 - [array_has](#array_has)
 - [array_has_all](#array_has_all)
 - [array_has_any](#array_has_any)
@@ -1987,6 +1988,7 @@ from_unixtime(expression)
 - [list_cat](#list_cat)
 - [list_concat](#list_concat)
 - [list_dims](#list_dims)
+- [list_distinct](#list_distinct)
 - [list_element](#list_element)
 - [list_extract](#list_extract)
 - [list_has](#list_has)
@@ -2204,6 +2206,34 @@ array_dims(array)
 
 - list_dims
 
+### `array_distinct`
+
+Returns distinct values from the array after removing duplicates.
+
+```
+array_distinct(array)
+```
+
+#### Arguments
+
+- **array**: Array expression.
+  Can be a constant, column, or function, and any combination of array 
operators.
+
+#### Example
+
+```
+❯ select array_distinct([1, 3, 2, 3, 1, 2, 4]);
++---------------------------------+
+| array_distinct(List([1,2,3,4])) |
++---------------------------------+
+| [1, 2, 3, 4]                    |
++---------------------------------+
+```
+
+#### Aliases
+
+- list_distinct
+
 ### `array_element`
 
 Extracts the element with the index n from the array.

Reply via email to