This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 62000b4e16 perf(array-agg): add fast path for array agg for 
`merge_batch` (#14299)
62000b4e16 is described below

commit 62000b4e169fac85cd71715b59e0e796b8e694ec
Author: Raz Luvaton <16746759+rluva...@users.noreply.github.com>
AuthorDate: Wed Jan 29 17:06:37 2025 +0200

    perf(array-agg): add fast path for array agg for `merge_batch` (#14299)
    
    * perf(array-agg): add fast path for array agg for `merge_batch`
    
    * update comment
    
    * fix slice length
    
    * fix: make sure we are not inserting empty lists
---
 datafusion/functions-aggregate/src/array_agg.rs | 79 ++++++++++++++++++++++++-
 1 file changed, 76 insertions(+), 3 deletions(-)

diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index b75de83f6a..9fff059991 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -17,7 +17,7 @@
 
 //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`]
 
-use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, StructArray};
+use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, 
StructArray};
 use arrow::datatypes::DataType;
 
 use arrow_schema::{Field, Fields};
@@ -177,6 +177,67 @@ impl ArrayAggAccumulator {
             datatype: datatype.clone(),
         })
     }
+
+    /// This function will return the underlying list array values if all 
valid values are consecutive without gaps (i.e. no null value point to a non 
empty list)
+    /// If there are gaps but only in the end of the list array, the function 
will return the values without the null values in the end
+    fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> 
Option<ArrayRef> {
+        let offsets = list_array.value_offsets();
+        // Offsets always have at least 1 value
+        let initial_offset = offsets[0];
+        let null_count = list_array.null_count();
+
+        // If no nulls than just use the fast path
+        // This is ok as the state is a ListArray rather than a ListViewArray 
so all the values are consecutive
+        if null_count == 0 {
+            // According to Arrow specification, the first offset can be 
non-zero
+            let list_values = list_array.values().slice(
+                initial_offset as usize,
+                (offsets[offsets.len() - 1] - initial_offset) as usize,
+            );
+            return Some(list_values);
+        }
+
+        // If all the values are null than just return an empty values array
+        if list_array.null_count() == list_array.len() {
+            return Some(list_array.values().slice(0, 0));
+        }
+
+        // According to the Arrow spec, null values can point to non empty 
lists
+        // So this will check if all null values starting from the first valid 
value to the last one point to a 0 length list so we can just slice the 
underlying value
+
+        // Unwrapping is safe as we just checked if there is a null value
+        let nulls = list_array.nulls().unwrap();
+
+        let mut valid_slices_iter = nulls.valid_slices();
+
+        // This is safe as we validated that that are at least 1 valid value 
in the array
+        let (start, end) = valid_slices_iter.next().unwrap();
+
+        let start_offset = offsets[start];
+
+        // End is exclusive, so it already point to the last offset value
+        // This is valid as the length of the array is always 1 less than the 
length of the offsets
+        let mut end_offset_of_last_valid_value = offsets[end];
+
+        for (start, end) in valid_slices_iter {
+            // If there is a null value that point to a non empty list than 
the start offset of the valid value
+            // will be different that the end offset of the last valid value
+            if offsets[start] != end_offset_of_last_valid_value {
+                return None;
+            }
+
+            // End is exclusive, so it already point to the last offset value
+            // This is valid as the length of the array is always 1 less than 
the length of the offsets
+            end_offset_of_last_valid_value = offsets[end];
+        }
+
+        let consecutive_valid_values = list_array.values().slice(
+            start_offset as usize,
+            (end_offset_of_last_valid_value - start_offset) as usize,
+        );
+
+        Some(consecutive_valid_values)
+    }
 }
 
 impl Accumulator for ArrayAggAccumulator {
@@ -208,9 +269,21 @@ impl Accumulator for ArrayAggAccumulator {
         }
 
         let list_arr = as_list_array(&states[0])?;
-        for arr in list_arr.iter().flatten() {
-            self.values.push(arr);
+
+        match Self::get_optional_values_to_merge_as_is(list_arr) {
+            Some(values) => {
+                // Make sure we don't insert empty lists
+                if values.len() > 0 {
+                    self.values.push(values);
+                }
+            }
+            None => {
+                for arr in list_arr.iter().flatten() {
+                    self.values.push(arr);
+                }
+            }
         }
+
         Ok(())
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to