wesm commented on a change in pull request #7607:
URL: https://github.com/apache/arrow/pull/7607#discussion_r451834885



##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
##########
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/align_util.h"
+#include "arrow/util/bit_block_counter.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+struct ScalarAggregator : public KernelState {
+  virtual void Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
+  virtual void MergeFrom(KernelContext* ctx, const KernelState& src) = 0;
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+};
+
+void AddBasicAggKernels(KernelInit init,
+                        const std::vector<std::shared_ptr<DataType>>& types,
+                        std::shared_ptr<DataType> out_ty, 
ScalarAggregateFunction* func);
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+template <int64_t kRoundSize, typename ArrowType>
+struct SumState {
+  using SumType = typename FindAccumulatorType<ArrowType>::Type;
+  using ThisType = SumState<kRoundSize, ArrowType>;
+  using T = typename TypeTraits<ArrowType>::CType;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+
+  ThisType operator+(const ThisType& rhs) const {
+    return ThisType(this->count + rhs.count, this->sum + rhs.sum);
+  }
+
+  ThisType& operator+=(const ThisType& rhs) {
+    this->count += rhs.count;
+    this->sum += rhs.sum;
+
+    return *this;
+  }
+
+ public:
+  void Consume(const Array& input) {
+    const ArrayType& array = static_cast<const ArrayType&>(input);
+    if (input.null_count() == 0) {
+      (*this) += ConsumeDense(array);
+    } else {
+      (*this) += ConsumeSparse(array);
+    }
+  }
+
+  size_t count = 0;
+  typename SumType::c_type sum = 0;
+
+ private:
+  template <int64_t kDenseRoundSize>
+  ThisType ConsumeDense(const T* values, const int64_t length) const {
+    ThisType local;
+    const int64_t length_rounded = BitUtil::RoundDown(length, kDenseRoundSize);
+    typename SumType::c_type sum_rounded[kDenseRoundSize] = {0};
+
+    // Unrolled the loop to add the results in parallel

Review comment:
       "Unroll"

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
##########
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/align_util.h"
+#include "arrow/util/bit_block_counter.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+struct ScalarAggregator : public KernelState {
+  virtual void Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
+  virtual void MergeFrom(KernelContext* ctx, const KernelState& src) = 0;
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+};
+
+void AddBasicAggKernels(KernelInit init,
+                        const std::vector<std::shared_ptr<DataType>>& types,
+                        std::shared_ptr<DataType> out_ty, 
ScalarAggregateFunction* func);
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+template <int64_t kRoundSize, typename ArrowType>
+struct SumState {
+  using SumType = typename FindAccumulatorType<ArrowType>::Type;
+  using ThisType = SumState<kRoundSize, ArrowType>;
+  using T = typename TypeTraits<ArrowType>::CType;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+
+  ThisType operator+(const ThisType& rhs) const {
+    return ThisType(this->count + rhs.count, this->sum + rhs.sum);
+  }
+
+  ThisType& operator+=(const ThisType& rhs) {
+    this->count += rhs.count;
+    this->sum += rhs.sum;
+
+    return *this;
+  }
+
+ public:
+  void Consume(const Array& input) {
+    const ArrayType& array = static_cast<const ArrayType&>(input);
+    if (input.null_count() == 0) {
+      (*this) += ConsumeDense(array);
+    } else {
+      (*this) += ConsumeSparse(array);
+    }
+  }
+
+  size_t count = 0;
+  typename SumType::c_type sum = 0;
+
+ private:
+  template <int64_t kDenseRoundSize>
+  ThisType ConsumeDense(const T* values, const int64_t length) const {
+    ThisType local;
+    const int64_t length_rounded = BitUtil::RoundDown(length, kDenseRoundSize);
+    typename SumType::c_type sum_rounded[kDenseRoundSize] = {0};
+
+    // Unrolled the loop to add the results in parallel
+    for (int64_t i = 0; i < length_rounded; i += kDenseRoundSize) {
+      for (int64_t k = 0; k < kDenseRoundSize; k++) {
+        sum_rounded[k] += values[i + k];
+      }
+    }
+    for (int64_t k = 0; k < kDenseRoundSize; k++) {
+      local.sum += sum_rounded[k];
+    }
+
+    // The trailing part
+    for (int64_t i = length_rounded; i < length; ++i) {
+      local.sum += values[i];
+    }
+
+    local.count = length;
+    return local;
+  }
+
+  ThisType ConsumeDense(const ArrayType& array) const {
+    const auto values = array.raw_values();
+    const int64_t length = array.length();
+
+    return ConsumeDense<kRoundSize>(values, length);
+  }
+
+  // While this is not branchless, gcc needs this to be in a different function
+  // for it to generate cmov which ends to be slightly faster than

Review comment:
       "tends"

##########
File path: cpp/cmake_modules/SetupCxxFlags.cmake
##########
@@ -52,7 +52,22 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     check_cxx_compiler_flag(${ARROW_SSE4_2_FLAG} CXX_SUPPORTS_SSE4_2)
   endif()
   check_cxx_compiler_flag(${ARROW_AVX2_FLAG} CXX_SUPPORTS_AVX2)
-  check_cxx_compiler_flag(${ARROW_AVX512_FLAG} CXX_SUPPORTS_AVX512)
+  if(MINGW)
+    # https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65782
+    message(STATUS "Disbale AVX512 support on MINGW for now")

Review comment:
       typo: "disable"

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
##########
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/align_util.h"
+#include "arrow/util/bit_block_counter.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+struct ScalarAggregator : public KernelState {
+  virtual void Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
+  virtual void MergeFrom(KernelContext* ctx, const KernelState& src) = 0;
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+};
+
+void AddBasicAggKernels(KernelInit init,
+                        const std::vector<std::shared_ptr<DataType>>& types,
+                        std::shared_ptr<DataType> out_ty, 
ScalarAggregateFunction* func);
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+template <int64_t kRoundSize, typename ArrowType>
+struct SumState {
+  using SumType = typename FindAccumulatorType<ArrowType>::Type;
+  using ThisType = SumState<kRoundSize, ArrowType>;
+  using T = typename TypeTraits<ArrowType>::CType;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+
+  ThisType operator+(const ThisType& rhs) const {
+    return ThisType(this->count + rhs.count, this->sum + rhs.sum);
+  }
+
+  ThisType& operator+=(const ThisType& rhs) {
+    this->count += rhs.count;
+    this->sum += rhs.sum;
+
+    return *this;
+  }
+
+ public:
+  void Consume(const Array& input) {
+    const ArrayType& array = static_cast<const ArrayType&>(input);
+    if (input.null_count() == 0) {
+      (*this) += ConsumeDense(array);
+    } else {
+      (*this) += ConsumeSparse(array);
+    }
+  }
+
+  size_t count = 0;
+  typename SumType::c_type sum = 0;
+
+ private:
+  template <int64_t kDenseRoundSize>
+  ThisType ConsumeDense(const T* values, const int64_t length) const {

Review comment:
       I think it was called Dense before this patch. NoNulls would be clearer

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
##########
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/kernels/aggregate_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/align_util.h"
+#include "arrow/util/bit_block_counter.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+struct ScalarAggregator : public KernelState {
+  virtual void Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
+  virtual void MergeFrom(KernelContext* ctx, const KernelState& src) = 0;
+  virtual void Finalize(KernelContext* ctx, Datum* out) = 0;
+};
+
+void AddBasicAggKernels(KernelInit init,
+                        const std::vector<std::shared_ptr<DataType>>& types,
+                        std::shared_ptr<DataType> out_ty, 
ScalarAggregateFunction* func);
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+template <int64_t kRoundSize, typename ArrowType>
+struct SumState {
+  using SumType = typename FindAccumulatorType<ArrowType>::Type;
+  using ThisType = SumState<kRoundSize, ArrowType>;
+  using T = typename TypeTraits<ArrowType>::CType;
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+
+  ThisType operator+(const ThisType& rhs) const {
+    return ThisType(this->count + rhs.count, this->sum + rhs.sum);
+  }
+
+  ThisType& operator+=(const ThisType& rhs) {
+    this->count += rhs.count;
+    this->sum += rhs.sum;
+
+    return *this;
+  }
+
+ public:
+  void Consume(const Array& input) {
+    const ArrayType& array = static_cast<const ArrayType&>(input);
+    if (input.null_count() == 0) {
+      (*this) += ConsumeDense(array);
+    } else {
+      (*this) += ConsumeSparse(array);
+    }
+  }
+
+  size_t count = 0;
+  typename SumType::c_type sum = 0;
+
+ private:
+  template <int64_t kDenseRoundSize>
+  ThisType ConsumeDense(const T* values, const int64_t length) const {
+    ThisType local;
+    const int64_t length_rounded = BitUtil::RoundDown(length, kDenseRoundSize);
+    typename SumType::c_type sum_rounded[kDenseRoundSize] = {0};
+
+    // Unrolled the loop to add the results in parallel
+    for (int64_t i = 0; i < length_rounded; i += kDenseRoundSize) {
+      for (int64_t k = 0; k < kDenseRoundSize; k++) {
+        sum_rounded[k] += values[i + k];
+      }
+    }
+    for (int64_t k = 0; k < kDenseRoundSize; k++) {
+      local.sum += sum_rounded[k];
+    }
+
+    // The trailing part
+    for (int64_t i = length_rounded; i < length; ++i) {
+      local.sum += values[i];
+    }
+
+    local.count = length;
+    return local;
+  }
+
+  ThisType ConsumeDense(const ArrayType& array) const {
+    const auto values = array.raw_values();
+    const int64_t length = array.length();
+
+    return ConsumeDense<kRoundSize>(values, length);
+  }
+
+  // While this is not branchless, gcc needs this to be in a different function
+  // for it to generate cmov which ends to be slightly faster than
+  // multiplication but safe for handling NaN with doubles.
+  inline T MaskedValue(bool valid, T value) const { return valid ? value : 0; }
+
+  inline ThisType UnrolledSum(uint8_t bits, const T* values) const {
+    ThisType local;
+
+    if (bits < 0xFF) {
+      // Some nulls
+      for (size_t i = 0; i < 8; i++) {
+        local.sum += MaskedValue(bits & (1U << i), values[i]);
+      }
+      local.count += BitUtil::kBytePopcount[bits];
+    } else {
+      // No nulls
+      for (size_t i = 0; i < 8; i++) {
+        local.sum += values[i];
+      }
+      local.count += 8;
+    }
+
+    return local;
+  }
+
+  ThisType ConsumeSparse(const ArrayType& array) const {
+    ThisType local;
+    const T* values = array.raw_values();
+    const int64_t length = array.length();
+    int64_t offset = array.offset();
+    const uint8_t* bitmap = array.null_bitmap_data();
+    int64_t idx = 0;
+
+    const auto p = arrow::internal::BitmapWordAlign<1>(bitmap, offset, length);
+    // First handle the leading bits
+    const int64_t leading_bits = p.leading_bits;
+    while (idx < leading_bits) {
+      if (BitUtil::GetBit(bitmap, offset)) {
+        local.sum += values[idx];
+        local.count++;
+      }
+      idx++;
+      offset++;
+    }
+
+    // The aligned parts scanned with BitBlockCounter
+    int64_t kBatchSize = arrow::internal::BitBlockCounterWordSize();

Review comment:
       `BitBlockCounter::kWordSize`?

##########
File path: cpp/src/arrow/util/bit_block_counter.h
##########
@@ -76,6 +76,9 @@ struct BitBlockCount {
   bool AllSet() const { return this->length == this->popcount; }
 };
 
+/// \brief Return the word step size of each bit block counter run
+int64_t BitBlockCounterWordSize();

Review comment:
       Agreed, I think a `static constexpr` on the `BitBlockCounter` would be 
better

##########
File path: cpp/src/arrow/compute/kernels/aggregate_sum_avx2.cc
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/aggregate_basic_internal.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+// Round size optimized based on data type and compiler
+template <typename T>
+struct RoundSizeAvx2 {
+  static constexpr int64_t size = 32;
+};
+
+// Round size set to 64 for float/int32_t/uint32_t
+template <>
+struct RoundSizeAvx2<float> {
+  static constexpr int64_t size = 64;
+};
+
+template <>
+struct RoundSizeAvx2<int32_t> {
+  static constexpr int64_t size = 64;
+};
+
+template <>
+struct RoundSizeAvx2<uint32_t> {
+  static constexpr int64_t size = 64;
+};
+
+template <typename ArrowType>
+struct SumImplAvx2
+    : public SumImpl<RoundSizeAvx2<typename 
TypeTraits<ArrowType>::CType>::size,
+                     ArrowType> {};
+
+template <typename ArrowType>
+struct MeanImplAvx2
+    : public MeanImpl<RoundSizeAvx2<typename 
TypeTraits<ArrowType>::CType>::size,
+                      ArrowType> {};
+
+std::unique_ptr<KernelState> SumInitAvx2(KernelContext* ctx, const 
KernelInitArgs& args) {
+  SumLikeInit<SumImplAvx2> visitor(ctx, *args.inputs[0].type);
+  return visitor.Create();
+}
+
+std::unique_ptr<KernelState> MeanInitAvx2(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  SumLikeInit<MeanImplAvx2> visitor(ctx, *args.inputs[0].type);
+  return visitor.Create();
+}
+
+}  // namespace aggregate
+
+namespace internal {
+
+void RegisterScalarAggregateSumAvx2(FunctionRegistry* registry) {
+  auto func = std::make_shared<ScalarAggregateFunction>("sum", Arity::Unary());
+  aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, {boolean()}, int64(), 
func.get());
+  aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, SignedIntTypes(), 
int64(),
+                                func.get());
+  aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, UnsignedIntTypes(), 
uint64(),
+                                func.get());
+  aggregate::AddBasicAggKernels(aggregate::SumInitAvx2, FloatingPointTypes(), 
float64(),
+                                func.get());
+  // Register the override AVX2 version
+  DCHECK_OK(registry->AddFunction(std::move(func), true));

Review comment:
       use `/*allow_overwrite=*/true` here for clarity

##########
File path: cpp/src/arrow/compute/kernels/aggregate_sum_avx512.cc
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/kernels/aggregate_basic_internal.h"
+
+namespace arrow {
+namespace compute {
+namespace aggregate {
+
+// ----------------------------------------------------------------------
+// Sum implementation
+
+// Round size optimized based on data type and compiler
+template <typename T>
+struct RoundSizeAvx512 {
+  static constexpr int64_t size = 32;
+};
+
+// Round size set to 64 for float/int32_t/uint32_t
+template <>
+struct RoundSizeAvx512<float> {
+  static constexpr int64_t size = 64;
+};
+
+template <>
+struct RoundSizeAvx512<int32_t> {
+  static constexpr int64_t size = 64;
+};
+
+template <>
+struct RoundSizeAvx512<uint32_t> {
+  static constexpr int64_t size = 64;
+};

Review comment:
       These are the same as the avx2 versions, not sure if there's sense in 
consolidating their declarations to one place

##########
File path: cpp/src/arrow/compute/registry.cc
##########
@@ -115,6 +116,19 @@ static std::unique_ptr<FunctionRegistry> 
CreateBuiltInRegistry() {
   RegisterVectorNested(registry.get());
   RegisterVectorSort(registry.get());
 
+  // SIMD functions
+  auto cpu_info = arrow::internal::CpuInfo::GetInstance();
+#if defined(ARROW_HAVE_RUNTIME_AVX2)
+  if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) {
+    RegisterScalarAggregateSumAvx2(registry.get());

Review comment:
       @jianxind is using the `allow_overwrite` option in `AddFunction`. 
   
   It's a little bit coarse to do things this way -- I do not see this as the 
preferred long-term solution. Instead, we should add all the kernel variants to 
the same `Function` instance and then the `Dispatch*` methods should select the 
kernel with the maximum SIMD level as set on the Kernel object. That was the 
idea of the `simd_level` parameter
   
   
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernel.h#L562
   
   If you don't do this, then if you want to apply SIMD optimizations in even 
one variant of a kernel then it's a bit of a pain




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to