vertexclique commented on a change in pull request #8685:
URL: https://github.com/apache/arrow/pull/8685#discussion_r525009848
##########
File path: rust/arrow/src/datatypes.rs
##########
@@ -587,7 +600,7 @@ where
pub trait ArrowNumericType: ArrowPrimitiveType {}
macro_rules! make_numeric_type {
- ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident) => {
+ ($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident,
$min_value:expr, $max_value:expr) => {
Review comment:
Macro doesn't need to take min and max value. It can stay as it is
except `mask_and`.
##########
File path: rust/arrow/src/datatypes.rs
##########
@@ -771,6 +784,21 @@ macro_rules! make_numeric_type {
fn write(simd_result: Self::Simd, slice: &mut [Self::Native]) {
unsafe { simd_result.write_to_slice_unaligned_unchecked(slice)
};
}
+
+ #[inline]
+ fn mask_and(left: Self::SimdMask, right: Self::SimdMask) ->
Self::SimdMask {
+ left & right
+ }
+
+ #[inline]
+ fn identity_for_min_op() -> Self::Native {
+ $max_value
+ }
+
+ #[inline]
+ fn identity_for_max_op() -> Self::Native {
+ $min_value
+ }
Review comment:
These can be removed.
##########
File path: rust/arrow/src/compute/kernels/aggregate.rs
##########
@@ -179,87 +181,258 @@ where
}
}
-/// Returns the sum of values in the array.
-///
-/// Returns `None` if the array is empty or only contains null values.
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
-pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
- T::Native: Add<Output = T::Native>,
-{
- let null_count = array.null_count();
+mod simd {
+ use crate::array::{Array, PrimitiveArray};
+ use crate::datatypes::ArrowNumericType;
+ use std::marker::PhantomData;
+ use std::ops::Add;
+
+ pub(super) trait SimdAggregate<T: ArrowNumericType> {
+ /// Returns the identity value for this aggregation function
+ fn init_accumulator_scalar() -> T::Native;
+
+ /// Returns a vector filled with the identity value for this
aggregation function
+ #[inline]
+ fn init_accumulator_chunk() -> T::Simd {
+ T::init(Self::init_accumulator_scalar())
+ }
- if null_count == array.len() {
- return None;
+ /// Updates the accumulator with the values of one chunk
+ fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk:
T::Simd);
+
+ /// Updates the accumulator with the values of one chunk according to
the given vector mask
+ fn accumulate_chunk_nullable(
+ accumulator: &mut T::Simd,
+ chunk: T::Simd,
+ mask: T::SimdMask,
+ );
+
+ /// Updates the accumulator with one value
+ fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native);
+
+ /// Reduces the vector lanes of the accumulator to a single value
+ #[inline]
+ fn reduce(accumulator: T::Simd) -> T::Native {
+ // reduce by first writing to a temporary and then use scalar
operations
+ // this should be about the same performance as extracting
individual lanes
+ // but allows us to reuse the scalar reduction logic
+ let tmp = &mut [T::default_value(); 64];
+ T::write(accumulator, &mut tmp[0..T::lanes()]);
+
+ let mut reduced = Self::init_accumulator_scalar();
+ tmp[0..T::lanes()]
+ .iter()
+ .for_each(|value| Self::accumulate_scalar(&mut reduced,
*value));
+
+ reduced
+ }
}
- let data: &[T::Native] = array.value_slice(0, array.len());
+ pub(super) struct SumAggregate<T: ArrowNumericType> {
+ phantom: PhantomData<T>,
+ }
- let mut vector_sum = T::init(T::default_value());
- let mut remainder_sum = T::default_value();
+ impl<T: ArrowNumericType> SimdAggregate<T> for SumAggregate<T>
+ where
+ T::Native: Add<Output = T::Native>,
+ {
+ fn init_accumulator_scalar() -> T::Native {
+ T::default_value()
+ }
- match array.data().null_buffer() {
- None => {
- let data_chunks = data.chunks_exact(64);
- let remainder = data_chunks.remainder();
+ fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk:
T::Simd) {
+ *accumulator = *accumulator + chunk;
+ }
- data_chunks.for_each(|chunk| {
- chunk.chunks_exact(T::lanes()).for_each(|chunk| {
- let chunk = T::load(&chunk);
- vector_sum = vector_sum + chunk;
- });
- });
+ fn accumulate_chunk_nullable(
+ accumulator: &mut T::Simd,
+ chunk: T::Simd,
+ vecmask: T::SimdMask,
+ ) {
+ let zero = T::init(T::default_value());
+ let blended = T::mask_select(vecmask, chunk, zero);
- remainder.iter().for_each(|value| {
- remainder_sum = remainder_sum + *value;
- });
+ *accumulator = *accumulator + blended;
}
- Some(buffer) => {
- // process data in chunks of 64 elements since we also get 64 bits
of validity information at a time
- let data_chunks = data.chunks_exact(64);
- let remainder = data_chunks.remainder();
- let bit_chunks = buffer.bit_chunks(array.offset(), array.len());
- let remainder_bits = bit_chunks.remainder_bits();
+ fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) {
+ *accumulator = *accumulator + value
+ }
+ }
+
+ pub(super) struct MinAggregate<T: ArrowNumericType> {
+ phantom: PhantomData<T>,
+ }
+
+ impl<T: ArrowNumericType> SimdAggregate<T> for MinAggregate<T>
+ where
+ T::Native: PartialOrd,
+ {
+ fn init_accumulator_scalar() -> T::Native {
+ T::identity_for_min_op()
+ }
+
+ fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk:
T::Simd) {
+ let cmp_mask = T::lt(chunk, *accumulator);
+
+ *accumulator = T::mask_select(cmp_mask, chunk, *accumulator);
+ }
+
+ fn accumulate_chunk_nullable(
+ accumulator: &mut T::Simd,
+ chunk: T::Simd,
+ vecmask: T::SimdMask,
+ ) {
+ let cmp_mask = T::lt(chunk, *accumulator);
+ let blend_mask = T::mask_and(vecmask, cmp_mask);
+
+ *accumulator = T::mask_select(blend_mask, chunk, *accumulator);
+ }
+
+ fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) {
+ if value < *accumulator {
+ *accumulator = value
+ }
+ }
+ }
+
+ pub(super) struct MaxAggregate<T: ArrowNumericType> {
+ phantom: PhantomData<T>,
+ }
+
+ impl<T: ArrowNumericType> SimdAggregate<T> for MaxAggregate<T>
+ where
+ T::Native: PartialOrd,
+ {
+ fn init_accumulator_scalar() -> T::Native {
+ T::identity_for_max_op()
+ }
+
+ fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk:
T::Simd) {
+ let cmp_mask = T::gt(chunk, *accumulator);
+
+ *accumulator = T::mask_select(cmp_mask, chunk, *accumulator);
+ }
+
+ fn accumulate_chunk_nullable(
+ accumulator: &mut T::Simd,
+ chunk: T::Simd,
+ vecmask: T::SimdMask,
+ ) {
+ let cmp_mask = T::gt(chunk, *accumulator);
+ let blend_mask = T::mask_and(vecmask, cmp_mask);
+
+ *accumulator = T::mask_select(blend_mask, chunk, *accumulator);
+ }
+
+ fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) {
+ if value > *accumulator {
+ *accumulator = value
+ }
+ }
+ }
+
+ pub(super) fn simd_aggregation<T: ArrowNumericType, A: SimdAggregate<T>>(
+ array: &PrimitiveArray<T>,
+ ) -> Option<T::Native> {
+ let null_count = array.null_count();
+
+ if null_count == array.len() {
+ return None;
+ }
+
+ let data: &[T::Native] = array.value_slice(0, array.len());
+
+ let mut chunk_acc = A::init_accumulator_chunk();
+ let mut rem_acc = A::init_accumulator_scalar();
- data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| {
- // split chunks further into slices corresponding to the
vector length
- // the compiler is able to unroll this inner loop and remove
bounds checks
- // since the outer chunk size (64) is always a multiple of the
number of lanes
- chunk.chunks_exact(T::lanes()).for_each(|chunk| {
- let zero = T::init(T::default_value());
- let vecmask = T::mask_from_u64(mask);
- let chunk = T::load(&chunk);
- let blended = T::mask_select(vecmask, chunk, zero);
+ match array.data().null_buffer() {
+ None => {
+ let data_chunks = data.chunks_exact(64);
+ let remainder = data_chunks.remainder();
- vector_sum = vector_sum + blended;
+ data_chunks.for_each(|chunk| {
+ chunk.chunks_exact(T::lanes()).for_each(|chunk| {
+ let chunk = T::load(&chunk);
+ A::accumulate_chunk_non_null(&mut chunk_acc, chunk);
+ });
+ });
- mask = mask >> T::lanes();
+ remainder.iter().for_each(|value| {
+ A::accumulate_scalar(&mut rem_acc, *value);
});
- });
+ }
+ Some(buffer) => {
+ // process data in chunks of 64 elements since we also get 64
bits of validity information at a time
+ let data_chunks = data.chunks_exact(64);
+ let remainder = data_chunks.remainder();
- remainder.iter().enumerate().for_each(|(i, value)| {
- if remainder_bits & (1 << i) != 0 {
- remainder_sum = remainder_sum + *value;
- }
- });
+ let bit_chunks = buffer.bit_chunks(array.offset(),
array.len());
+ let remainder_bits = bit_chunks.remainder_bits();
+
+ data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| {
+ // split chunks further into slices corresponding to the
vector length
+ // the compiler is able to unroll this inner loop and
remove bounds checks
+ // since the outer chunk size (64) is always a multiple of
the number of lanes
+ chunk.chunks_exact(T::lanes()).for_each(|chunk| {
+ let vecmask = T::mask_from_u64(mask);
+ let chunk = T::load(&chunk);
+
+ A::accumulate_chunk_nullable(&mut chunk_acc, chunk,
vecmask);
+
+ mask = mask >> T::lanes();
+ });
+ });
+
+ remainder.iter().enumerate().for_each(|(i, value)| {
+ if remainder_bits & (1 << i) != 0 {
+ A::accumulate_scalar(&mut rem_acc, *value)
+ }
+ });
+ }
}
+
+ let mut total = A::reduce(chunk_acc);
+ A::accumulate_scalar(&mut total, rem_acc);
+
+ Some(total)
}
+}
+
+/// Returns the sum of values in the array.
+///
+/// Returns `None` if the array is empty or only contains null values.
+#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
+pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+ T::Native: Add<Output = T::Native>,
+{
+ use simd::*;
- // calculate horizontal sum of accumulator by writing to a temporary
- // this is probably faster than extracting individual lanes
- // the compiler is free to optimize this to something faster
- let tmp = &mut [T::default_value(); 64];
- T::write(vector_sum, &mut tmp[0..T::lanes()]);
+ simd::simd_aggregation::<T, SumAggregate<T>>(&array)
+}
- let mut total_sum = T::default_value();
- tmp[0..T::lanes()]
- .iter()
- .for_each(|lane| total_sum = total_sum + *lane);
+#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
+/// Returns the minimum value in the array, according to the natural order.
+pub fn min<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+ T::Native: PartialOrd,
+{
+ use simd::*;
- total_sum = total_sum + remainder_sum;
+ simd::simd_aggregation::<T, MinAggregate<T>>(&array)
+}
+
+#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
+/// Returns the maximum value in the array, according to the natural order.
+pub fn max<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
+where
+ T::Native: PartialOrd,
Review comment:
```suggestion
T::Native: num::Num + PartialOrd,
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]