jhorstmann commented on code in PR #5100:
URL: https://github.com/apache/arrow-rs/pull/5100#discussion_r1399130953


##########
arrow-arith/src/aggregate.rs:
##########
@@ -20,39 +20,298 @@
 use arrow_array::cast::*;
 use arrow_array::iterator::ArrayIter;
 use arrow_array::*;
-use arrow_buffer::ArrowNativeType;
+use arrow_buffer::{ArrowNativeType, NullBuffer};
 use arrow_data::bit_iterator::try_for_each_valid_idx;
 use arrow_schema::ArrowError;
 use arrow_schema::*;
+use std::borrow::BorrowMut;
 use std::ops::{BitAnd, BitOr, BitXor};
 
-/// Generic test for NaN, the optimizer should be able to remove this for 
integer types.
-#[inline]
-pub(crate) fn is_nan<T: ArrowNativeType + PartialOrd + Copy>(a: T) -> bool {
-    #[allow(clippy::eq_op)]
-    !(a == a)
+#[inline(always)]
+fn select<T: Copy>(m: bool, a: T, b: T) -> T {
+    if m {
+        a
+    } else {
+        b
+    }
 }
 
-/// Returns the minimum value in the array, according to the natural order.
-/// For floating point arrays any NaN values are considered to be greater than 
any other non-null value
-#[cfg(not(feature = "simd"))]
-pub fn min<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
-    T: ArrowNumericType,
-    T::Native: ArrowNativeType,
-{
-    min_max_helper::<T::Native, _, _>(array, |a, b| (is_nan(*a) & !is_nan(*b)) 
|| a > b)
+trait NumericAccumulator<T: ArrowNativeTypeOp>: Copy + Default {
+    fn accumulate(&mut self, value: T);
+    fn accumulate_nullable(&mut self, value: T, valid: bool);
+    fn merge(&mut self, other: Self);
+    fn finish(&mut self) -> T;
 }
 
-/// Returns the maximum value in the array, according to the natural order.
-/// For floating point arrays any NaN values are considered to be greater than 
any other non-null value
-#[cfg(not(feature = "simd"))]
-pub fn max<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
-    T: ArrowNumericType,
-    T::Native: ArrowNativeType,
-{
-    min_max_helper::<T::Native, _, _>(array, |a, b| (!is_nan(*a) & is_nan(*b)) 
|| a < b)
+#[derive(Clone, Copy)]
+struct SumAccumulator<T: ArrowNativeTypeOp> {
+    sum: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for SumAccumulator<T> {
+    fn default() -> Self {
+        Self { sum: T::ZERO }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for SumAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        self.sum = self.sum.add_wrapping(value);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let sum = self.sum;
+        self.sum = select(valid, sum.add_wrapping(value), sum)
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.sum = self.sum.add_wrapping(other.sum);
+    }
+
+    fn finish(&mut self) -> T {
+        self.sum
+    }
+}
+
+#[derive(Clone, Copy)]
+struct MinAccumulator<T: ArrowNativeTypeOp> {
+    min: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for MinAccumulator<T> {
+    fn default() -> Self {
+        Self { min: T::MAX }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for MinAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        let min = self.min;
+        self.min = select(value.is_lt(min), value, min);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let min = self.min;
+        let is_lt = valid & value.is_lt(min);
+        self.min = select(is_lt, value, min);
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.accumulate(other.min)
+    }
+
+    fn finish(&mut self) -> T {
+        self.min
+    }
+}
+
+#[derive(Clone, Copy)]
+struct MaxAccumulator<T: ArrowNativeTypeOp> {
+    max: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for MaxAccumulator<T> {
+    fn default() -> Self {
+        Self { max: T::MIN }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for MaxAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        let max = self.max;
+        self.max = select(value.is_gt(max), value, max);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let max = self.max;
+        let is_gt = value.is_gt(max) & valid;
+        self.max = select(is_gt, value, max);
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.accumulate(other.max)
+    }
+
+    fn finish(&mut self) -> T {
+        self.max
+    }
+}
+
+fn reduce_accumulators<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const 
LANES: usize>(
+    mut acc: [A; LANES],
+) -> A {
+    assert!(LANES > 0 && LANES.is_power_of_two());
+    let mut len = LANES;
+
+    // attempt at tree reduction, unfortunately llvm does not fully recognize 
this pattern,
+    // but the generated code is still a little faster than purely sequential 
reduction for floats.
+    while len >= 2 {
+        let mid = len / 2;
+        let (h, t) = acc[..len].split_at_mut(mid);
+
+        for i in 0..mid {
+            h[i].merge(t[i]);
+        }
+        len /= 2;
+    }
+    acc[0]
+}
+
+#[inline(always)]
+fn aggregate_nonnull_chunk<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, 
const LANES: usize>(
+    acc: &mut [A; LANES],
+    values: &[T; LANES],
+) {
+    for i in 0..LANES {
+        acc[i].accumulate(values[i]);
+    }
+}
+
+#[inline(always)]
+fn aggregate_nullable_chunk<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, 
const LANES: usize>(
+    acc: &mut [A; LANES],
+    values: &[T; LANES],
+    validity: u64,
+) {
+    let mut bit = 1;
+    for i in 0..LANES {
+        acc[i].accumulate_nullable(values[i], (validity & bit) != 0);
+        bit <<= 1;
+    }
+}
+
+fn aggregate_nonnull_simple<T: ArrowNativeTypeOp, A: 
NumericAccumulator<T>>(values: &[T]) -> T {
+    return values
+        .iter()
+        .copied()
+        .fold(A::default(), |mut a, b| {
+            a.accumulate(b);
+            a
+        })
+        .finish();
+}
+
+#[inline(never)]
+fn aggregate_nonnull_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, 
const LANES: usize>(
+    values: &[T],
+) -> T {
+    let mut acc = [A::default(); LANES];
+    let mut chunks = values.chunks_exact(LANES);
+    chunks.borrow_mut().for_each(|chunk| {
+        aggregate_nonnull_chunk(&mut acc, chunk[..LANES].try_into().unwrap());
+    });
+
+    let remainder = chunks.remainder();
+    if remainder.len() > 0 {
+        if remainder.len() > 0 {
+            for i in 0..remainder.len() {
+                acc[i].accumulate(remainder[i]);
+            }
+        }
+    }
+
+    reduce_accumulators(acc).finish()
+}
+
+#[inline(never)]
+fn aggregate_nullable_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, 
const LANES: usize>(
+    values: &[T],
+    validity: &NullBuffer,
+) -> T {
+    assert!(LANES > 0 && 64 % LANES == 0);
+    assert_eq!(values.len(), validity.len());
+
+    let mut acc = [A::default(); LANES];
+    let mut values_chunks = values.chunks_exact(64);
+    let validity_chunks = validity.inner().bit_chunks();
+    let mut validity_chunks_iter = validity_chunks.iter();
+
+    values_chunks.borrow_mut().for_each(|chunk| {
+        // Safety: we asserted that values and validity have the same length 
and trust the iterator impl
+        let mut validity = unsafe { 
validity_chunks_iter.next().unwrap_unchecked() };
+        chunk.chunks_exact(LANES).for_each(|chunk| {
+            aggregate_nullable_chunk(&mut acc, 
chunk[..LANES].try_into().unwrap(), validity);
+            validity >>= LANES;
+        });
+    });
+
+    let remainder = values_chunks.remainder();
+    if remainder.len() > 0 {
+        let mut validity = validity_chunks.remainder_bits();
+
+        let mut remainder_chunks = remainder.chunks_exact(LANES);
+        remainder_chunks.borrow_mut().for_each(|chunk| {
+            aggregate_nullable_chunk(&mut acc, 
chunk[..LANES].try_into().unwrap(), validity);
+            validity >>= LANES;
+        });
+
+        let remainder = remainder_chunks.remainder();
+        if remainder.len() > 0 {
+            let mut bit = 1;
+            for i in 0..remainder.len() {
+                acc[i].accumulate_nullable(remainder[i], (validity & bit) != 
0);
+                bit <<= 1;
+            }
+        }
+    }
+
+    reduce_accumulators(acc).finish()
+}
+
+// The preferred vector size in bytes for the target platform.
+// Note that the avx512 target feature is still unstable and this also means 
it is not detected on stable rust.
+const PREFERRED_VECTOR_SIZE: usize =
+    if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) {
+        64
+    } else if cfg!(all(target_arch = "x86_64", target_feature = "avx")) {
+        32
+    } else {
+        16
+    };
+
+// non-nullable aggregation requires fewer temporary registers so we can use 
more of them for accumulators
+const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2;
+
+fn aggregate<T: ArrowNativeTypeOp, P: ArrowPrimitiveType<Native = T>, A: 
NumericAccumulator<T>>(
+    array: &PrimitiveArray<P>,
+) -> Option<T> {
+    let null_count = array.null_count();
+    if null_count == array.len() {
+        return None;
+    }
+    let values = array.values().as_ref();
+    match array.nulls() {
+        Some(nulls) if null_count > 0 => match PREFERRED_VECTOR_SIZE / 
std::mem::size_of::<T>() {

Review Comment:
   I think using the outer type parameter is still not possible in const 
context. I'll try it again and add a comment or link to a tracking issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to