alamb commented on code in PR #8268:
URL: https://github.com/apache/arrow-datafusion/pull/8268#discussion_r1420828089
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1991,6 +1992,71 @@ pub fn array_intersect(args: &[ArrayRef]) ->
Result<ArrayRef> {
}
}
+pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
+ array: &GenericListArray<OffsetSize>,
+ field: &FieldRef,
+) -> Result<ArrayRef> {
+ let dt = array.value_type();
+ let mut offsets = vec![OffsetSize::usize_as(0)];
+ let mut new_arrays = vec![];
+ let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+ // distinct for each list in ListArray
+ for arr in array.iter().flatten() {
+ let values = converter.convert_columns(&[arr])?;
Review Comment:
> I also don't have another idea other than downcast arr, I was just
wondering if it is worth to downcast to exact arr.
Downcasting to the exact array type can result in faster code in many cases,
as the rust compiler can make specialized implemenations for each type.
However, [there are a lot of
DataTypes](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html),
including nested ones like Dict, List, Struct, etc so making specialized
implementations often requires a lot of work
The row converter handles all the types internally.
What we have typically done in the past with DataFusion is to use non type
specific code like `RowConverter` for the general case, and then if we find a
particular usecase needs faster performance we make special implementations.
For example, we do so for grouing by single primtive columns (`GROUP BY int32`)
for example
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -2111,6 +2111,66 @@ pub fn array_intersect(args: &[ArrayRef]) ->
Result<ArrayRef> {
}
}
+pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
+ array: &GenericListArray<OffsetSize>,
+ field: &FieldRef,
+) -> Result<ArrayRef> {
+ let dt = array.value_type();
+ let mut offsets = Vec::with_capacity(array.len());
+ offsets.push(OffsetSize::usize_as(0));
+ let mut new_arrays = Vec::with_capacity(array.len());
+ let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+ // distinct for each list in ListArray
+ for arr in array.iter().flatten() {
+ let values = converter.convert_columns(&[arr])?;
+ // sort elements in list and remove duplicates
+ let rows = values.iter().sorted().dedup().collect::<Vec<_>>();
+ let last_offset: OffsetSize = offsets.last().copied().unwrap();
+ offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
+ let arrays = converter.convert_rows(rows)?;
+ let array = match arrays.get(0) {
+ Some(array) => array.clone(),
+ None => {
+ return internal_err!("array_distinct: failed to get array from
rows")
+ }
+ };
+ new_arrays.push(array);
+ }
+ let offsets = OffsetBuffer::new(offsets.into());
+ let new_arrays_ref = new_arrays.iter().map(|v|
v.as_ref()).collect::<Vec<_>>();
+ let values = compute::concat(&new_arrays_ref)?;
+ Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
+ field.clone(),
+ offsets,
+ values,
+ None,
+ )?))
+}
+
+/// array_distinct SQL function
+/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
+pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
+ assert_eq!(args.len(), 1);
+
+ // handle null
+ if args[0].data_type() == &DataType::Null {
+ return Ok(args[0].clone());
+ }
+
+ // handle for list & largelist
+ match args[0].data_type() {
+ DataType::List(field) => {
+ let array = as_list_array(&args[0])?;
+ general_array_distinct(array, field)
Review Comment:
I think we can merge this PR as is and then add support for `LargeList`
(using the `OffsetSize` trait) as a follow on PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]