This is an automated email from the ASF dual-hosted git repository.

jeffreyvo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 92a239a54e Implement min, max, sum for run-end-encoded arrays. (#9409)
92a239a54e is described below

commit 92a239a54e33043f05fef98d81d3c7bd2b926467
Author: Bruno <[email protected]>
AuthorDate: Thu Mar 12 07:31:45 2026 +0100

    Implement min, max, sum for run-end-encoded arrays. (#9409)
    
    Efficient implementations:
    * min & max work directly on the values child array.
    * sum folds over run lengths & values, without decompressing the array.
    
    In particular, those implementations takes care of the logical offset &
    len of the run-end-encoded arrays. This is non-trivial:
    * We get the physical start & end indices in O(log(#runs)), but those
    are incorrect for empty arrays.
    * Slicing can happen in the middle of a run. For sum, we need to track
    the logical start & end and reduce the run length accordingly.
    
    Finally, one caveat: the aggregation functions only work when the child
    values array is a primitive array. That's fine ~always, but some client
    might store the values in an unexpected type. They'll either get None or
    an Error, depending on the aggregation function used.
    
    This feature is tracked in
    https://github.com/apache/arrow-rs/issues/3520.
---
 arrow-arith/src/aggregate.rs | 296 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 292 insertions(+), 4 deletions(-)

diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs
index a043259694..59792d0c5b 100644
--- a/arrow-arith/src/aggregate.rs
+++ b/arrow-arith/src/aggregate.rs
@@ -540,7 +540,7 @@ pub fn min_string_view(array: &StringViewArray) -> 
Option<&str> {
 /// Returns the sum of values in the array.
 ///
 /// This doesn't detect overflow. Once overflowing, the result will wrap 
around.
-/// For an overflow-checking variant, use `sum_array_checked` instead.
+/// For an overflow-checking variant, use [`sum_array_checked`] instead.
 pub fn sum_array<T, A: ArrayAccessor<Item = T::Native>>(array: A) -> 
Option<T::Native>
 where
     T: ArrowNumericType,
@@ -567,6 +567,12 @@ where
 
             Some(sum)
         }
+        DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
+            DataType::Int16 => ree::sum_wrapping::<types::Int16Type, 
T>(&array),
+            DataType::Int32 => ree::sum_wrapping::<types::Int32Type, 
T>(&array),
+            DataType::Int64 => ree::sum_wrapping::<types::Int64Type, 
T>(&array),
+            _ => unreachable!(),
+        },
         _ => sum::<T>(as_primitive_array(&array)),
     }
 }
@@ -574,7 +580,9 @@ where
 /// Returns the sum of values in the array.
 ///
 /// This detects overflow and returns an `Err` for that. For an 
non-overflow-checking variant,
-/// use `sum_array` instead.
+/// use [`sum_array`] instead.
+/// Additionally returns an `Err` on run-end-encoded arrays with a provided
+/// values type parameter that is incorrect.
 pub fn sum_array_checked<T, A: ArrayAccessor<Item = T::Native>>(
     array: A,
 ) -> Result<Option<T::Native>, ArrowError>
@@ -603,10 +611,110 @@ where
 
             Ok(Some(sum))
         }
+        DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
+            DataType::Int16 => ree::sum_checked::<types::Int16Type, T>(&array),
+            DataType::Int32 => ree::sum_checked::<types::Int32Type, T>(&array),
+            DataType::Int64 => ree::sum_checked::<types::Int64Type, T>(&array),
+            _ => unreachable!(),
+        },
         _ => sum_checked::<T>(as_primitive_array(&array)),
     }
 }
 
+// Logic for summing run-end-encoded arrays.
+mod ree {
+    use std::convert::Infallible;
+
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::RunEndIndexType;
+    use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType, 
PrimitiveArray, TypedRunArray};
+    use arrow_buffer::ArrowNativeType;
+    use arrow_schema::ArrowError;
+
+    /// Downcasts an array to a TypedRunArray.
+    fn downcast<'a, I: RunEndIndexType, V: ArrowNumericType>(
+        array: &'a dyn Array,
+    ) -> Option<TypedRunArray<'a, I, PrimitiveArray<V>>> {
+        let array = array.as_run_opt::<I>()?;
+        // We only support RunArray wrapping primitive types.
+        array.downcast::<PrimitiveArray<V>>()
+    }
+
+    /// Computes the sum (wrapping) of the array values.
+    pub(super) fn sum_wrapping<I: RunEndIndexType, V: ArrowNumericType>(
+        array: &dyn Array,
+    ) -> Option<V::Native> {
+        let ree = downcast::<I, V>(array)?;
+        let Ok(sum) = fold(ree, |acc, val, len| -> Result<V::Native, 
Infallible> {
+            Ok(acc.add_wrapping(val.mul_wrapping(V::Native::usize_as(len))))
+        });
+        sum
+    }
+
+    /// Computes the sum (erroring on overflow) of the array values.
+    pub(super) fn sum_checked<I: RunEndIndexType, V: ArrowNumericType>(
+        array: &dyn Array,
+    ) -> Result<Option<V::Native>, ArrowError> {
+        let Some(ree) = downcast::<I, V>(array) else {
+            return Err(ArrowError::InvalidArgumentError(
+                "Input run array values are not a PrimitiveArray".to_string(),
+            ));
+        };
+        fold(ree, |acc, val, len| -> Result<V::Native, ArrowError> {
+            let Some(len) = V::Native::from_usize(len) else {
+                return Err(ArrowError::ArithmeticOverflow(format!(
+                    "Cannot convert a run-end index ({:?}) to the value type 
({})",
+                    len,
+                    std::any::type_name::<V::Native>()
+                )));
+            };
+            acc.add_checked(val.mul_checked(len)?)
+        })
+    }
+
+    /// Folds over the values in a run-end-encoded array.
+    fn fold<'a, I: RunEndIndexType, V: ArrowNumericType, F, E>(
+        array: TypedRunArray<'a, I, PrimitiveArray<V>>,
+        mut f: F,
+    ) -> Result<Option<V::Native>, E>
+    where
+        F: FnMut(V::Native, V::Native, usize) -> Result<V::Native, E>,
+    {
+        let run_ends = array.run_ends();
+        let logical_start = run_ends.offset();
+        let logical_end = run_ends.offset() + run_ends.len();
+        let run_ends = run_ends.sliced_values();
+
+        let values_slice = array.run_array().values_slice();
+        let values = values_slice
+            .as_any()
+            .downcast_ref::<PrimitiveArray<V>>()
+            // Safety: we know the values array is PrimitiveArray<V>.
+            .unwrap();
+
+        let mut prev_end = 0;
+        let mut acc = V::Native::ZERO;
+        let mut has_non_null_value = false;
+
+        for (run_end, value) in run_ends.zip(values) {
+            let current_run_end = run_end.as_usize().clamp(logical_start, 
logical_end);
+            let run_length = current_run_end - prev_end;
+
+            if let Some(value) = value {
+                has_non_null_value = true;
+                acc = f(acc, value, run_length)?;
+            }
+
+            prev_end = current_run_end;
+            if current_run_end == logical_end {
+                break;
+            }
+        }
+
+        Ok(if has_non_null_value { Some(acc) } else { None })
+    }
+}
+
 /// Returns the min of values in the array of `ArrowNumericType` type, or 
dictionary
 /// array with value of `ArrowNumericType` type.
 pub fn min_array<T, A: ArrayAccessor<Item = T::Native>>(array: A) -> 
Option<T::Native>
@@ -639,6 +747,20 @@ where
 {
     match array.data_type() {
         DataType::Dictionary(_, _) => min_max_helper::<T::Native, _, _>(array, 
cmp),
+        DataType::RunEndEncoded(run_ends, _) => {
+            // We can directly perform min/max on the values child array, as 
any
+            // run must have non-zero length.
+            let array: &dyn Array = &array;
+            let values = match run_ends.data_type() {
+                DataType::Int16 => 
array.as_run_opt::<types::Int16Type>()?.values_slice(),
+                DataType::Int32 => 
array.as_run_opt::<types::Int32Type>()?.values_slice(),
+                DataType::Int64 => 
array.as_run_opt::<types::Int64Type>()?.values_slice(),
+                _ => return None,
+            };
+            // We only support RunArray wrapping primitive types.
+            let values = values.as_any().downcast_ref::<PrimitiveArray<T>>()?;
+            m(values)
+        }
         _ => m(as_primitive_array(&array)),
     }
 }
@@ -751,7 +873,7 @@ pub fn bool_or(array: &BooleanArray) -> Option<bool> {
 /// Returns `Ok(None)` if the array is empty or only contains null values.
 ///
 /// This detects overflow and returns an `Err` for that. For an 
non-overflow-checking variant,
-/// use `sum` instead.
+/// use [`sum`] instead.
 pub fn sum_checked<T>(array: &PrimitiveArray<T>) -> Result<Option<T::Native>, 
ArrowError>
 where
     T: ArrowNumericType,
@@ -799,7 +921,7 @@ where
 /// Returns `None` if the array is empty or only contains null values.
 ///
 /// This doesn't detect overflow in release mode by default. Once overflowing, 
the result will
-/// wrap around. For an overflow-checking variant, use `sum_checked` instead.
+/// wrap around. For an overflow-checking variant, use [`sum_checked`] instead.
 pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
 where
     T::Native: ArrowNativeTypeOp,
@@ -1750,4 +1872,170 @@ mod tests {
         sum_checked(&a).expect_err("overflow should be detected");
         sum_array_checked::<Int32Type, _>(&a).expect_err("overflow should be 
detected");
     }
+
+    /// Helper for building a RunArray.
+    fn make_run_array<'a, I: RunEndIndexType, V: ArrowNumericType, ItemType>(
+        values: impl IntoIterator<Item = &'a ItemType>,
+    ) -> RunArray<I>
+    where
+        ItemType: Clone + Into<Option<V::Native>> + 'static,
+    {
+        let mut builder = arrow_array::builder::PrimitiveRunBuilder::<I, 
V>::new();
+        for v in values.into_iter() {
+            builder.append_option((*v).clone().into());
+        }
+        builder.finish()
+    }
+
+    #[test]
+    fn test_ree_sum_array_basic() {
+        let run_array = make_run_array::<Int16Type, Int32Type, _>(&[10, 10, 
20, 30, 30, 30]);
+        let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+        let result = sum_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(130));
+
+        let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+        assert_eq!(result, Some(130));
+    }
+
+    #[test]
+    fn test_ree_sum_array_empty() {
+        let run_array = make_run_array::<Int16Type, Int32Type, i32>(&[]);
+        let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+        let result = sum_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, None);
+
+        let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+        assert_eq!(result, None);
+    }
+
+    #[test]
+    fn test_ree_sum_array_with_nulls() {
+        let run_array =
+            make_run_array::<Int16Type, Int32Type, _>(&[Some(10), None, 
Some(20), None, Some(30)]);
+        let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+        let result = sum_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(60));
+
+        let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+        assert_eq!(result, Some(60));
+    }
+
+    #[test]
+    fn test_ree_sum_array_with_only_nulls() {
+        let run_array = make_run_array::<Int16Type, Int16Type, _>(&[None, 
None, None, None, None]);
+        let typed_array = run_array.downcast::<Int16Array>().unwrap();
+
+        let result = sum_array::<Int16Type, _>(typed_array);
+        assert_eq!(result, None);
+
+        let result = sum_array_checked::<Int16Type, _>(typed_array).unwrap();
+        assert_eq!(result, None);
+    }
+
+    #[test]
+    fn test_ree_sum_array_overflow() {
+        let run_array = make_run_array::<Int16Type, Int8Type, _>(&[126, 2]);
+        let typed_array = run_array.downcast::<Int8Array>().unwrap();
+
+        // i8 range is -128..=127. 126+2 overflows to -128.
+        let result = sum_array::<Int8Type, _>(typed_array);
+        assert_eq!(result, Some(-128));
+
+        let result = sum_array_checked::<Int8Type, _>(typed_array);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_ree_sum_array_sliced() {
+        let run_array = make_run_array::<Int16Type, UInt8Type, _>(&[0, 10, 10, 
10, 20, 30, 30, 30]);
+        // Skip 2 values at the start and 1 at the end.
+        let sliced = run_array.slice(2, 5);
+        let typed_array = sliced.downcast::<UInt8Array>().unwrap();
+
+        let result = sum_array::<UInt8Type, _>(typed_array);
+        assert_eq!(result, Some(100));
+
+        let result = sum_array_checked::<UInt8Type, _>(typed_array).unwrap();
+        assert_eq!(result, Some(100));
+    }
+
+    #[test]
+    fn test_ree_min_max_array_basic() {
+        let run_array = make_run_array::<Int16Type, Int32Type, _>(&[30, 30, 
10, 20, 20]);
+        let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+        let result = min_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(10));
+
+        let result = max_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(30));
+    }
+
+    #[test]
+    fn test_ree_min_max_array_empty() {
+        let run_array = make_run_array::<Int16Type, Int32Type, i32>(&[]);
+        let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+        let result = min_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, None);
+
+        let result = max_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, None);
+    }
+
+    #[test]
+    fn test_ree_min_max_array_float() {
+        let run_array = make_run_array::<Int16Type, Float64Type, _>(&[5.5, 
5.5, 2.1, 8.9, 8.9]);
+        let typed_array = run_array.downcast::<Float64Array>().unwrap();
+
+        let result = min_array::<Float64Type, _>(typed_array);
+        assert_eq!(result, Some(2.1));
+
+        let result = max_array::<Float64Type, _>(typed_array);
+        assert_eq!(result, Some(8.9));
+    }
+
+    #[test]
+    fn test_ree_min_max_array_with_nulls() {
+        let run_array = make_run_array::<Int16Type, UInt8Type, _>(&[None, 
Some(10)]);
+        let typed_array = run_array.downcast::<UInt8Array>().unwrap();
+
+        let result = min_array::<UInt8Type, _>(typed_array);
+        assert_eq!(result, Some(10));
+
+        let result = max_array::<UInt8Type, _>(typed_array);
+        assert_eq!(result, Some(10));
+    }
+
+    #[test]
+    fn test_ree_min_max_array_sliced() {
+        let run_array = make_run_array::<Int16Type, Int32Type, _>(&[0, 30, 30, 
10, 20, 20, 100]);
+        // Skip 1 value at the start and 1 at the end.
+        let sliced = run_array.slice(1, 5);
+        let typed_array = sliced.downcast::<Int32Array>().unwrap();
+
+        let result = min_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(10));
+
+        let result = max_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(30));
+    }
+
+    #[test]
+    fn test_ree_min_max_array_sliced_mid_run() {
+        let run_array = make_run_array::<Int16Type, Int32Type, _>(&[0, 0, 30, 
10, 20, 100, 100]);
+        // Skip 1 value at the start and 1 at the end.
+        let sliced = run_array.slice(1, 5);
+        let typed_array = sliced.downcast::<Int32Array>().unwrap();
+
+        let result = min_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(0));
+
+        let result = max_array::<Int32Type, _>(typed_array);
+        assert_eq!(result, Some(100));
+    }
 }

Reply via email to