jayzhan211 commented on code in PR #7897:
URL: https://github.com/apache/arrow-datafusion/pull/7897#discussion_r1385786235


##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+    let converter =
+        
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+    
+    let nulls = NullBuffer::union(l.nulls(), r.nulls());
+    let field = Arc::new(Field::new(
+        "item",
+        l.value_type().to_owned(),
+        l.is_nullable(),
+    ));
+    let l_values = l.values().clone();
+    let r_values = r.values().clone();
+    let l_values = converter.convert_columns(&[l_values]).unwrap();
+    let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+    // Might be worth adding an upstream OffsetBufferBuilder
+    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+    offsets.push(OffsetSize::usize_as(0));
+    let mut rows = Vec::with_capacity(l_values.num_rows() + 
r_values.num_rows());
+
+    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+        let mut dedup = HashSet::new();
+        // Needed to preserve ordering
+        let mut row_elements:Vec<Row<'_>> = vec![];
+        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+        for i in l_slice {
+            let left_row = l_values.row(i);
+            if dedup.insert(left_row) {
+                row_elements.push(left_row);

Review Comment:
   I think we can just push the value in rows, no need row_elements, since 
`dedup` ensure the value we push into is already unique.



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+    let converter =
+        
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+    
+    let nulls = NullBuffer::union(l.nulls(), r.nulls());
+    let field = Arc::new(Field::new(
+        "item",
+        l.value_type().to_owned(),
+        l.is_nullable(),
+    ));
+    let l_values = l.values().clone();
+    let r_values = r.values().clone();
+    let l_values = converter.convert_columns(&[l_values]).unwrap();
+    let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+    // Might be worth adding an upstream OffsetBufferBuilder
+    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+    offsets.push(OffsetSize::usize_as(0));
+    let mut rows = Vec::with_capacity(l_values.num_rows() + 
r_values.num_rows());
+
+    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+        let mut dedup = HashSet::new();
+        // Needed to preserve ordering
+        let mut row_elements:Vec<Row<'_>> = vec![];
+        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+        for i in l_slice {
+            let left_row = l_values.row(i);
+            if dedup.insert(left_row) {
+                row_elements.push(left_row);
+            }
+        }
+        for i in r_slice {
+            let right_row=r_values.row(i);
+            if dedup.insert(right_row){
+                row_elements.push(right_row);
+            }
+        }
+
+        rows.extend(row_elements.iter());
+        offsets.push(OffsetSize::usize_as(rows.len()));
+        dedup.clear();
+        row_elements.clear();

Review Comment:
   since you declare new allocator in each loop, no need to explicitly clear it 
here. The same as `dedup`.



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+    let converter =
+        
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+    
+    let nulls = NullBuffer::union(l.nulls(), r.nulls());
+    let field = Arc::new(Field::new(
+        "item",
+        l.value_type().to_owned(),
+        l.is_nullable(),

Review Comment:
   Note that your current implementation cant pass column-wise test cases.
   
   Example test case
   ```text
   statement ok
   CREATE TABLE arrays_with_repeating_elements
   AS VALUES
     ([1], [2]),
     ([2, 3], [3]),
     ([3], [3, 4])
   ;
   
   query ?
   select array_union(column1, column2) from arrays_with_repeating_elements;
   ----
   [1, 2]
   [2, 3]
   [3, 4]
   
   statement ok
   drop table arrays_with_repeating_elements;
   ```
   
   The reason why your test fails is because 
   ```text
   Invalid argument error: column types must match schema types, expected 
List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: \{\} \}\) but found List\(Field \{ name: 
"item", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: \{\} \}\) at column index 0
   ```
   
   `l.is_nullable()` count number of nulls which is not always the same as the 
`nullable` in List field. In this case it return false while the expected value 
is true. The return type of `array_union` should be the same as the return type 
in `BuiltinScalarFunction`.  You can directly pass the `&FieldRef` into 
`union_generic_lists` since the field before and after `array_union` should be 
the same.
   
   
   



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+    let converter =
+        
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+    
+    let nulls = NullBuffer::union(l.nulls(), r.nulls());
+    let field = Arc::new(Field::new(
+        "item",
+        l.value_type().to_owned(),
+        l.is_nullable(),
+    ));
+    let l_values = l.values().clone();
+    let r_values = r.values().clone();
+    let l_values = converter.convert_columns(&[l_values]).unwrap();
+    let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+    // Might be worth adding an upstream OffsetBufferBuilder
+    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+    offsets.push(OffsetSize::usize_as(0));
+    let mut rows = Vec::with_capacity(l_values.num_rows() + 
r_values.num_rows());
+
+    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+        let mut dedup = HashSet::new();
+        // Needed to preserve ordering
+        let mut row_elements:Vec<Row<'_>> = vec![];
+        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+        for i in l_slice {
+            let left_row = l_values.row(i);
+            if dedup.insert(left_row) {
+                row_elements.push(left_row);
+            }
+        }
+        for i in r_slice {
+            let right_row=r_values.row(i);
+            if dedup.insert(right_row){
+                row_elements.push(right_row);
+            }
+        }
+
+        rows.extend(row_elements.iter());
+        offsets.push(OffsetSize::usize_as(rows.len()));
+        dedup.clear();
+        row_elements.clear();
+    }
+
+    let values = converter.convert_rows(rows).unwrap();
+    let offsets = OffsetBuffer::new(offsets.into());
+    let result = values[0].clone();
+    Ok(GenericListArray::<OffsetSize>::new(
+        field, offsets, result, nulls,
+    ))
+}
+
+/// Array_union SQL function
+pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
+    if args.len() != 2 {
+        return exec_err!("array_union needs two arguments");
+    }
+    let array1 = &args[0];
+    let array2 = &args[1];
+    match (array1.data_type(), array2.data_type()) {
+        (DataType::Null, _) => Ok(array2.clone()),
+        (_, DataType::Null) => Ok(array1.clone()),
+        (DataType::List(_), DataType::List(_)) => {
+            check_datatypes("array_union", &[&array1, &array2])?;
+            let list1 = array1.as_list::<i32>();

Review Comment:
   I prefer datafusion::common::cast as_list_array and as_large_list_array



##########
datafusion/sqllogictest/test_files/array.slt:
##########
@@ -1752,6 +1752,34 @@ select array_to_string(make_array(), ',')
 ----
 (empty)
 
+
+## array_union (aliases: `list_union`)
+
+# array_union scalar function #1
+query ?
+select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
+----
+[1, 2, 3, 4, 5, 6]
+
+# array_union scalar function #2
+query ?
+select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
+----
+[1, 2, 3, 4, 5, 6, 7, 8]
+
+# array_union scalar function #3
+query ?
+select array_union([1,2,3], []);
+----
+[1, 2, 3]
+
+# array_union scalar function #4
+query ?
+select array_union([1, 2, 3, 4], [5, 4]);
+----
+[1, 2, 3, 4, 5]

Review Comment:
   Your implementation cover the column-wise cases so it is better to add them 
into test. Also, you can also float type, I had test it, your code can pass 
them too.



##########
datafusion/expr/src/built_in_function.rs:
##########
@@ -597,6 +600,7 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::ArrayReplaceAll => 
Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArraySlice => 
Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayToString => Ok(Utf8),
+            BuiltinScalarFunction::ArrayUnion => 
Ok(input_expr_types[0].clone()),

Review Comment:
   This is the return type I mentioned, this (logical expr schema) should match 
the return type of `array_union` (physical expr schema)



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {

Review Comment:
   I prefer `Result<GenericListArray<OffsetSize>>`



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
     }};
 }
 
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+    l: &GenericListArray<OffsetSize>,
+    r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+    let converter =
+        
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+    
+    let nulls = NullBuffer::union(l.nulls(), r.nulls());
+    let field = Arc::new(Field::new(
+        "item",
+        l.value_type().to_owned(),
+        l.is_nullable(),
+    ));
+    let l_values = l.values().clone();
+    let r_values = r.values().clone();
+    let l_values = converter.convert_columns(&[l_values]).unwrap();
+    let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+    // Might be worth adding an upstream OffsetBufferBuilder
+    let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+    offsets.push(OffsetSize::usize_as(0));
+    let mut rows = Vec::with_capacity(l_values.num_rows() + 
r_values.num_rows());
+
+    for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+        let mut dedup = HashSet::new();
+        // Needed to preserve ordering
+        let mut row_elements:Vec<Row<'_>> = vec![];
+        let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+        let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+        for i in l_slice {
+            let left_row = l_values.row(i);
+            if dedup.insert(left_row) {
+                row_elements.push(left_row);
+            }
+        }
+        for i in r_slice {
+            let right_row=r_values.row(i);
+            if dedup.insert(right_row){
+                row_elements.push(right_row);
+            }
+        }
+
+        rows.extend(row_elements.iter());
+        offsets.push(OffsetSize::usize_as(rows.len()));
+        dedup.clear();
+        row_elements.clear();
+    }
+
+    let values = converter.convert_rows(rows).unwrap();

Review Comment:
   use `?` to avoid panic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to