lidavidm commented on a change in pull request #10890:
URL: https://github.com/apache/arrow/pull/10890#discussion_r685378556



##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -189,6 +189,53 @@ TEST(TestBooleanAggregation, Sum) {
               ResultWith(Datum(MakeNullScalar(uint64()))));
 }
 
+TEST(TestBooleanAggregation, Product) {
+  const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults();
+  ValidateBooleanAgg<Product>("[]", std::make_shared<UInt64Scalar>(), options);

Review comment:
       FindAccumulatorType has a specialization for Boolean so that it's 
UInt64Scalar: 
https://github.com/apache/arrow/blob/2aa94c4712ce406d7c87d361b5c655a6ea585701/cpp/src/arrow/compute/kernels/aggregate_internal.h#L32-L35

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1011,6 +1012,118 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Product implementation
+
+using arrow::internal::MultiplyWithOverflow;
+
+template <typename Type>
+struct GroupedProductImpl final : public GroupedAggregator {
+  using AccType = typename FindAccumulatorType<Type>::Type;
+  using ProductType = typename TypeTraits<AccType>::CType;
+
+  Status Init(ExecContext* ctx, const FunctionOptions*) override {
+    pool_ = ctx->memory_pool();
+    products_ = TypedBufferBuilder<ProductType>(pool_);
+    valid_ = TypedBufferBuilder<bool>(pool_);
+    out_type_ = TypeTraits<AccType>::type_singleton();
+    return Status::OK();
+  }

Review comment:
       Ah, whoops, let me fix that.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -133,6 +134,116 @@ Result<std::unique_ptr<KernelState>> 
MeanInit(KernelContext* ctx,
   return visitor.Create();
 }
 
+// ----------------------------------------------------------------------
+// Product implementation
+
+using arrow::internal::MultiplyWithOverflow;
+
+template <typename ArrowType>
+struct ProductImpl : public ScalarAggregator {
+  using ThisType = ProductImpl<ArrowType>;
+  using CType = typename ArrowType::c_type;
+  using ProductType = typename FindAccumulatorType<ArrowType>::Type;
+  using OutputType = typename TypeTraits<ProductType>::ScalarType;
+
+  explicit ProductImpl(const ScalarAggregateOptions& options) { this->options 
= options; }
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const auto& data = batch[0].array();
+      this->count += data->length - data->GetNullCount();
+      return VisitArrayDataInline<ArrowType>(
+          *data,
+          [&](typename TypeTraits<ArrowType>::CType value) {
+            if (ARROW_PREDICT_FALSE(
+                    MultiplyWithOverflow(this->product, value, 
&this->product))) {
+              return Status::Invalid("Overflow in product");
+            }
+            return Status::OK();
+          },
+          [] { return Status::OK(); });
+    } else {
+      const auto& data = *batch[0].scalar();
+      this->count += data.is_valid * batch.length;

Review comment:
       If we aggregate over a partition column, we'll have a scalar but with 
batch.length > 1. See 
https://github.com/apache/arrow/pull/10725#discussion_r670689127




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to