jayzhan211 commented on code in PR #7897:
URL: https://github.com/apache/arrow-datafusion/pull/7897#discussion_r1385786235
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+ let converter =
+
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+
+ let nulls = NullBuffer::union(l.nulls(), r.nulls());
+ let field = Arc::new(Field::new(
+ "item",
+ l.value_type().to_owned(),
+ l.is_nullable(),
+ ));
+ let l_values = l.values().clone();
+ let r_values = r.values().clone();
+ let l_values = converter.convert_columns(&[l_values]).unwrap();
+ let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+ // Might be worth adding an upstream OffsetBufferBuilder
+ let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+ offsets.push(OffsetSize::usize_as(0));
+ let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
+
+ for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+ let mut dedup = HashSet::new();
+ // Needed to preserve ordering
+ let mut row_elements:Vec<Row<'_>> = vec![];
+ let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+ let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+ for i in l_slice {
+ let left_row = l_values.row(i);
+ if dedup.insert(left_row) {
+ row_elements.push(left_row);
Review Comment:
I think we can just push the value in rows, no need row_elements, since
`dedup` ensure the value we push into is already unique.
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+ let converter =
+
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+
+ let nulls = NullBuffer::union(l.nulls(), r.nulls());
+ let field = Arc::new(Field::new(
+ "item",
+ l.value_type().to_owned(),
+ l.is_nullable(),
+ ));
+ let l_values = l.values().clone();
+ let r_values = r.values().clone();
+ let l_values = converter.convert_columns(&[l_values]).unwrap();
+ let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+ // Might be worth adding an upstream OffsetBufferBuilder
+ let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+ offsets.push(OffsetSize::usize_as(0));
+ let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
+
+ for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+ let mut dedup = HashSet::new();
+ // Needed to preserve ordering
+ let mut row_elements:Vec<Row<'_>> = vec![];
+ let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+ let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+ for i in l_slice {
+ let left_row = l_values.row(i);
+ if dedup.insert(left_row) {
+ row_elements.push(left_row);
+ }
+ }
+ for i in r_slice {
+ let right_row=r_values.row(i);
+ if dedup.insert(right_row){
+ row_elements.push(right_row);
+ }
+ }
+
+ rows.extend(row_elements.iter());
+ offsets.push(OffsetSize::usize_as(rows.len()));
+ dedup.clear();
+ row_elements.clear();
Review Comment:
since you declare new allocator in each loop, no need to explicitly clear it
here. The same as `dedup`.
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+ let converter =
+
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+
+ let nulls = NullBuffer::union(l.nulls(), r.nulls());
+ let field = Arc::new(Field::new(
+ "item",
+ l.value_type().to_owned(),
+ l.is_nullable(),
Review Comment:
Note that your current implementation cant pass column-wise test cases.
Example test case
```text
statement ok
CREATE TABLE arrays_with_repeating_elements
AS VALUES
([1], [2]),
([2, 3], [3]),
([3], [3, 4])
;
query ?
select array_union(column1, column2) from arrays_with_repeating_elements;
----
[1, 2]
[2, 3]
[3, 4]
statement ok
drop table arrays_with_repeating_elements;
```
The reason why your test fails is because
```text
Invalid argument error: column types must match schema types, expected
List\(Field \{ name: "item", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: \{\} \}\) but found List\(Field \{ name:
"item", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: \{\} \}\) at column index 0
```
`l.is_nullable()` count number of nulls which is not always the same as the
`nullable` in List field. In this case it return false while the expected value
is true. The return type of `array_union` should be the same as the return type
in `BuiltinScalarFunction`. You can directly pass the `&FieldRef` into
`union_generic_lists` since the field before and after `array_union` should be
the same.
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+ let converter =
+
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+
+ let nulls = NullBuffer::union(l.nulls(), r.nulls());
+ let field = Arc::new(Field::new(
+ "item",
+ l.value_type().to_owned(),
+ l.is_nullable(),
+ ));
+ let l_values = l.values().clone();
+ let r_values = r.values().clone();
+ let l_values = converter.convert_columns(&[l_values]).unwrap();
+ let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+ // Might be worth adding an upstream OffsetBufferBuilder
+ let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+ offsets.push(OffsetSize::usize_as(0));
+ let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
+
+ for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+ let mut dedup = HashSet::new();
+ // Needed to preserve ordering
+ let mut row_elements:Vec<Row<'_>> = vec![];
+ let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+ let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+ for i in l_slice {
+ let left_row = l_values.row(i);
+ if dedup.insert(left_row) {
+ row_elements.push(left_row);
+ }
+ }
+ for i in r_slice {
+ let right_row=r_values.row(i);
+ if dedup.insert(right_row){
+ row_elements.push(right_row);
+ }
+ }
+
+ rows.extend(row_elements.iter());
+ offsets.push(OffsetSize::usize_as(rows.len()));
+ dedup.clear();
+ row_elements.clear();
+ }
+
+ let values = converter.convert_rows(rows).unwrap();
+ let offsets = OffsetBuffer::new(offsets.into());
+ let result = values[0].clone();
+ Ok(GenericListArray::<OffsetSize>::new(
+ field, offsets, result, nulls,
+ ))
+}
+
+/// Array_union SQL function
+pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 2 {
+ return exec_err!("array_union needs two arguments");
+ }
+ let array1 = &args[0];
+ let array2 = &args[1];
+ match (array1.data_type(), array2.data_type()) {
+ (DataType::Null, _) => Ok(array2.clone()),
+ (_, DataType::Null) => Ok(array1.clone()),
+ (DataType::List(_), DataType::List(_)) => {
+ check_datatypes("array_union", &[&array1, &array2])?;
+ let list1 = array1.as_list::<i32>();
Review Comment:
I prefer datafusion::common::cast as_list_array and as_large_list_array
##########
datafusion/sqllogictest/test_files/array.slt:
##########
@@ -1752,6 +1752,34 @@ select array_to_string(make_array(), ',')
----
(empty)
+
+## array_union (aliases: `list_union`)
+
+# array_union scalar function #1
+query ?
+select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
+----
+[1, 2, 3, 4, 5, 6]
+
+# array_union scalar function #2
+query ?
+select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
+----
+[1, 2, 3, 4, 5, 6, 7, 8]
+
+# array_union scalar function #3
+query ?
+select array_union([1,2,3], []);
+----
+[1, 2, 3]
+
+# array_union scalar function #4
+query ?
+select array_union([1, 2, 3, 4], [5, 4]);
+----
+[1, 2, 3, 4, 5]
Review Comment:
Your implementation cover the column-wise cases so it is better to add them
into test. Also, you can also float type, I had test it, your code can pass
them too.
##########
datafusion/expr/src/built_in_function.rs:
##########
@@ -597,6 +600,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll =>
Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArraySlice =>
Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayToString => Ok(Utf8),
+ BuiltinScalarFunction::ArrayUnion =>
Ok(input_expr_types[0].clone()),
Review Comment:
This is the return type I mentioned, this (logical expr schema) should match
the return type of `array_union` (physical expr schema)
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
Review Comment:
I prefer `Result<GenericListArray<OffsetSize>>`
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1358,6 +1360,94 @@ macro_rules! to_string {
}};
}
+fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+ l: &GenericListArray<OffsetSize>,
+ r: &GenericListArray<OffsetSize>,
+) -> Result<GenericListArray<OffsetSize>, DataFusionError> {
+ let converter =
+
RowConverter::new(vec![SortField::new(l.value_type().clone())]).unwrap();
+
+ let nulls = NullBuffer::union(l.nulls(), r.nulls());
+ let field = Arc::new(Field::new(
+ "item",
+ l.value_type().to_owned(),
+ l.is_nullable(),
+ ));
+ let l_values = l.values().clone();
+ let r_values = r.values().clone();
+ let l_values = converter.convert_columns(&[l_values]).unwrap();
+ let r_values = converter.convert_columns(&[r_values]).unwrap();
+
+ // Might be worth adding an upstream OffsetBufferBuilder
+ let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
+ offsets.push(OffsetSize::usize_as(0));
+ let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
+
+ for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
+ let mut dedup = HashSet::new();
+ // Needed to preserve ordering
+ let mut row_elements:Vec<Row<'_>> = vec![];
+ let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
+ let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
+ for i in l_slice {
+ let left_row = l_values.row(i);
+ if dedup.insert(left_row) {
+ row_elements.push(left_row);
+ }
+ }
+ for i in r_slice {
+ let right_row=r_values.row(i);
+ if dedup.insert(right_row){
+ row_elements.push(right_row);
+ }
+ }
+
+ rows.extend(row_elements.iter());
+ offsets.push(OffsetSize::usize_as(rows.len()));
+ dedup.clear();
+ row_elements.clear();
+ }
+
+ let values = converter.convert_rows(rows).unwrap();
Review Comment:
use `?` to avoid panic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]