This is an automated email from the ASF dual-hosted git repository.

Jefffrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8afbea4b07 feat: Adds product aggregate compute kernel (#10151)
8afbea4b07 is described below

commit 8afbea4b0771f288cc082c0b133e06de037b6fbc
Author: WeblWabl <[email protected]>
AuthorDate: Mon Jun 22 09:46:51 2026 -0500

    feat: Adds product aggregate compute kernel (#10151)
    
    The C++ Arrow implementation has a product op for aggregate compute
    kernel. This commit adds parity to the C++ implementation.
    
    Closes #10150
---
 arrow-arith/src/aggregate.rs | 145 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 145 insertions(+)

diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs
index 8745c779ce..2e5713dd7d 100644
--- a/arrow-arith/src/aggregate.rs
+++ b/arrow-arith/src/aggregate.rs
@@ -78,6 +78,36 @@ impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for 
SumAccumulator<T> {
     }
 }
 
+#[derive(Clone, Copy)]
+struct ProductAccumulator<T: ArrowNativeTypeOp> {
+    product: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for ProductAccumulator<T> {
+    fn default() -> Self {
+        Self { product: T::ONE }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for ProductAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        self.product = self.product.mul_wrapping(value);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let product = self.product;
+        self.product = select(valid, product.mul_wrapping(value), product)
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.product = self.product.mul_wrapping(other.product);
+    }
+
+    fn finish(&mut self) -> T {
+        self.product
+    }
+}
+
 #[derive(Clone, Copy)]
 struct MinAccumulator<T: ArrowNativeTypeOp> {
     min: T,
@@ -914,6 +944,60 @@ pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) 
-> Option<T::Native>
     aggregate::<T::Native, T, SumAccumulator<T::Native>>(array)
 }
 
+/// Returns the product of values in the primitive array.
+///
+/// Returns `None` if the array is empty or only contains null values.
+///
+/// This doesn't detect overflow in release mode by default. Once overflowing, 
the result will
+/// wrap around. For an overflow-checking variant, use [`product_checked`] 
instead.
+pub fn product<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> 
Option<T::Native> {
+    aggregate::<T::Native, T, ProductAccumulator<T::Native>>(array)
+}
+
+/// Returns the product of values in the primitive array.
+///
+/// Returns `Ok(None)` if the array is empty or only contains null values.
+///
+/// This detects overflow and returns an `Err` for that. For an 
non-overflow-checking variant,
+/// use [`product`] instead.
+pub fn product_checked<T: ArrowNumericType>(
+    array: &PrimitiveArray<T>,
+) -> Result<Option<T::Native>, ArrowError> {
+    let null_count = array.null_count();
+
+    if null_count == array.len() {
+        return Ok(None);
+    }
+
+    let data: &[T::Native] = array.values();
+
+    match array.nulls() {
+        None => {
+            let product = data.iter().try_fold(T::Native::ONE, |accumulator, 
value| {
+                accumulator.mul_checked(*value)
+            })?;
+
+            Ok(Some(product))
+        }
+        Some(nulls) => {
+            let mut product = T::Native::ONE;
+
+            try_for_each_valid_idx(
+                nulls.len(),
+                nulls.offset(),
+                nulls.null_count(),
+                Some(nulls.validity()),
+                |idx| {
+                    unsafe { product = 
product.mul_checked(array.value_unchecked(idx))? };
+                    Ok::<_, ArrowError>(())
+                },
+            )?;
+
+            Ok(Some(product))
+        }
+    }
+}
+
 /// Returns the minimum value in the array, according to the natural order.
 /// For floating point arrays any NaN values are considered to be greater than 
any other non-null value
 ///
@@ -963,6 +1047,67 @@ mod tests {
         assert_eq!(16.5, sum(&a).unwrap());
     }
 
+    #[test]
+    fn test_primitive_array_product() {
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        assert_eq!(120, product(&a).unwrap());
+    }
+
+    #[test]
+    fn test_primitive_array_float_product() {
+        let a = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
+        assert_eq!(120.0, product(&a).unwrap());
+    }
+
+    #[test]
+    fn test_primitive_array_product_with_nulls() {
+        let a = Int32Array::from(vec![None, Some(2), Some(3), None, Some(5)]);
+        assert_eq!(30, product(&a).unwrap());
+    }
+
+    #[test]
+    fn test_primitive_array_product_all_nulls() {
+        let a = Int32Array::from(vec![None, None, None]);
+        assert_eq!(None, product(&a));
+    }
+
+    #[test]
+    fn test_primitive_array_product_empty() {
+        let a = Int32Array::from(Vec::<i32>::new());
+        assert_eq!(None, product(&a));
+    }
+
+    #[test]
+    fn test_primitive_array_product_checked() {
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        assert_eq!(120, product_checked(&a).unwrap().unwrap());
+    }
+
+    #[test]
+    fn test_primitive_array_product_checked_with_nulls() {
+        let a = Int32Array::from(vec![None, Some(2), Some(3), None, Some(5)]);
+        assert_eq!(30, product_checked(&a).unwrap().unwrap());
+    }
+
+    #[test]
+    fn test_primitive_array_product_checked_all_nulls() {
+        let a = Int32Array::from(vec![None, None, None]);
+        assert_eq!(None, product_checked(&a).unwrap());
+    }
+
+    #[test]
+    fn test_product_overflow() {
+        let a = Int32Array::from(vec![i32::MAX, 2]);
+        // wrapping variant silently overflows
+        assert_eq!(product(&a).unwrap(), -2);
+    }
+
+    #[test]
+    fn test_product_checked_overflow() {
+        let a = Int32Array::from(vec![i32::MAX, 2]);
+        product_checked(&a).expect_err("overflow should be detected");
+    }
+
     #[test]
     fn test_primitive_array_sum_with_nulls() {
         let a = Int32Array::from(vec![None, Some(2), Some(3), None, Some(5)]);

Reply via email to