This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 62000b4e16 perf(array-agg): add fast path for array agg for `merge_batch` (#14299) 62000b4e16 is described below commit 62000b4e169fac85cd71715b59e0e796b8e694ec Author: Raz Luvaton <16746759+rluva...@users.noreply.github.com> AuthorDate: Wed Jan 29 17:06:37 2025 +0200 perf(array-agg): add fast path for array agg for `merge_batch` (#14299) * perf(array-agg): add fast path for array agg for `merge_batch` * update comment * fix slice length * fix: make sure we are not inserting empty lists --- datafusion/functions-aggregate/src/array_agg.rs | 79 ++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index b75de83f6a..9fff059991 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -17,7 +17,7 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] -use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, StructArray}; +use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, StructArray}; use arrow::datatypes::DataType; use arrow_schema::{Field, Fields}; @@ -177,6 +177,67 @@ impl ArrayAggAccumulator { datatype: datatype.clone(), }) } + + /// This function will return the underlying list array values if all valid values are consecutive without gaps (i.e. no null value point to a non empty list) + /// If there are gaps but only in the end of the list array, the function will return the values without the null values in the end + fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> Option<ArrayRef> { + let offsets = list_array.value_offsets(); + // Offsets always have at least 1 value + let initial_offset = offsets[0]; + let null_count = list_array.null_count(); + + // If no nulls than just use the fast path + // This is ok as the state is a ListArray rather than a ListViewArray so all the values are consecutive + if null_count == 0 { + // According to Arrow specification, the first offset can be non-zero + let list_values = list_array.values().slice( + initial_offset as usize, + (offsets[offsets.len() - 1] - initial_offset) as usize, + ); + return Some(list_values); + } + + // If all the values are null than just return an empty values array + if list_array.null_count() == list_array.len() { + return Some(list_array.values().slice(0, 0)); + } + + // According to the Arrow spec, null values can point to non empty lists + // So this will check if all null values starting from the first valid value to the last one point to a 0 length list so we can just slice the underlying value + + // Unwrapping is safe as we just checked if there is a null value + let nulls = list_array.nulls().unwrap(); + + let mut valid_slices_iter = nulls.valid_slices(); + + // This is safe as we validated that that are at least 1 valid value in the array + let (start, end) = valid_slices_iter.next().unwrap(); + + let start_offset = offsets[start]; + + // End is exclusive, so it already point to the last offset value + // This is valid as the length of the array is always 1 less than the length of the offsets + let mut end_offset_of_last_valid_value = offsets[end]; + + for (start, end) in valid_slices_iter { + // If there is a null value that point to a non empty list than the start offset of the valid value + // will be different that the end offset of the last valid value + if offsets[start] != end_offset_of_last_valid_value { + return None; + } + + // End is exclusive, so it already point to the last offset value + // This is valid as the length of the array is always 1 less than the length of the offsets + end_offset_of_last_valid_value = offsets[end]; + } + + let consecutive_valid_values = list_array.values().slice( + start_offset as usize, + (end_offset_of_last_valid_value - start_offset) as usize, + ); + + Some(consecutive_valid_values) + } } impl Accumulator for ArrayAggAccumulator { @@ -208,9 +269,21 @@ impl Accumulator for ArrayAggAccumulator { } let list_arr = as_list_array(&states[0])?; - for arr in list_arr.iter().flatten() { - self.values.push(arr); + + match Self::get_optional_values_to_merge_as_is(list_arr) { + Some(values) => { + // Make sure we don't insert empty lists + if values.len() > 0 { + self.values.push(values); + } + } + None => { + for arr in list_arr.iter().flatten() { + self.values.push(arr); + } + } } + Ok(()) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org