pitrou commented on a change in pull request #10942:
URL: https://github.com/apache/arrow/pull/10942#discussion_r691949321



##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -174,19 +172,23 @@ TEST(TestBooleanAggregation, Sum) {
                           ScalarAggregateOptions(/*skip_nulls=*/true, 
/*min_count=*/2));
   ValidateBooleanAgg<Sum>(json, std::make_shared<UInt64Scalar>(),
                           ScalarAggregateOptions(/*skip_nulls=*/true, 
/*min_count=*/3));
-  ValidateBooleanAgg<Sum>(json, std::make_shared<UInt64Scalar>(1),
-                          ScalarAggregateOptions(/*skip_nulls=*/false, 
/*min_count=*/1));
-  ValidateBooleanAgg<Sum>(json, std::make_shared<UInt64Scalar>(1),
-                          ScalarAggregateOptions(/*skip_nulls=*/false, 
/*min_count=*/2));
+  ValidateBooleanAgg<Sum>("[]", std::make_shared<UInt64Scalar>(0),
+                          ScalarAggregateOptions(/*skip_nulls=*/false, 
/*min_count=*/0));
   ValidateBooleanAgg<Sum>(json, std::make_shared<UInt64Scalar>(),
-                          ScalarAggregateOptions(/*skip_nulls=*/false, 
/*min_count=*/3));
+                          ScalarAggregateOptions(/*skip_nulls=*/false, 
/*min_count=*/0));

Review comment:
       Why remove the non-zero min_count tests here?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1104,6 +1130,17 @@ struct GroupedProductImpl final : public 
GroupedAggregator {
       BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
     }
 
+    if (!options_.skip_nulls) {
+      null_count = kUnknownNullCount;
+      if (null_bitmap) {
+        arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
+                                   no_nulls_.data(), /*right_offset=*/0, 
num_groups_,
+                                   /*out_offset=*/0, 
null_bitmap->mutable_data());
+      } else {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish());
+      }
+    }
+

Review comment:
       Sidenote: the Sum, Product and Mean aggregators probably have a lot of 
code in common. Do you think it can be factored out in some kind of mixin or 
base class?
   Or, conversely, that the operation-specific code can be moved into a 
separate class on which the main aggregator implementation would be templated?

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -219,21 +221,24 @@ TEST(TestBooleanAggregation, Product) {
       json, std::make_shared<UInt64Scalar>(),
       ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/3));
   ValidateBooleanAgg<Product>(
-      json, std::make_shared<UInt64Scalar>(1),
-      ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/1));
-  ValidateBooleanAgg<Product>(
-      json, std::make_shared<UInt64Scalar>(1),
-      ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/2));
+      "[]", std::make_shared<UInt64Scalar>(1),
+      ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/0));
   ValidateBooleanAgg<Product>(
       json, std::make_shared<UInt64Scalar>(),
-      ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/3));
+      ScalarAggregateOptions(/*skip_nulls=*/false, /*min_count=*/0));

Review comment:
       Same question here.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1269,6 +1297,64 @@ TEST(GroupBy, Product) {
                           /*verbose=*/true);
 }
 
+TEST(GroupBy, SumMeanProduct) {

Review comment:
       Call this `SumMeanProductKeepNulls`?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1740,117 +1790,239 @@ struct GroupedMinMaxFactory {
 // Any/All implementation
 
 struct GroupedAnyImpl : public GroupedAggregator {
-  Status Init(ExecContext* ctx, const FunctionOptions*) override {
-    seen_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = checked_cast<const ScalarAggregateOptions&>(*options);
+    pool_ = ctx->memory_pool();
+    seen_ = TypedBufferBuilder<bool>(pool_);
+    no_nulls_ = TypedBufferBuilder<bool>(pool_);
+    counts_ = TypedBufferBuilder<int64_t>(pool_);
     return Status::OK();
   }
 
   Status Resize(int64_t new_num_groups) override {
     auto added_groups = new_num_groups - num_groups_;
     num_groups_ = new_num_groups;
-    return seen_.Append(added_groups, false);
+    RETURN_NOT_OK(seen_.Append(added_groups, false));
+    RETURN_NOT_OK(no_nulls_.Append(added_groups, true));
+    return counts_.Append(added_groups, 0);
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    uint8_t* seen = seen_.mutable_data();
+    uint8_t* no_nulls = no_nulls_.mutable_data();
+    int64_t* counts = counts_.mutable_data();
+    const auto& input = *batch[0].array();
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    if (input.MayHaveNulls()) {
+      const uint8_t* bitmap = input.buffers[1]->data();
+      arrow::internal::VisitBitBlocksVoid(
+          input.buffers[0], input.offset, input.length,
+          [&](int64_t position) {
+            counts[*g]++;
+            BitUtil::SetBitTo(

Review comment:
       Can probably shortcut here:
   ```c++
   if (!BitUtil::GetBit(seen, *g) && BitUtil::GetBit(bitmap, position)) {
     BitUtil::SetBitTo(seen, *g);
   }
   ```
   

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1740,117 +1790,239 @@ struct GroupedMinMaxFactory {
 // Any/All implementation
 
 struct GroupedAnyImpl : public GroupedAggregator {
-  Status Init(ExecContext* ctx, const FunctionOptions*) override {
-    seen_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = checked_cast<const ScalarAggregateOptions&>(*options);
+    pool_ = ctx->memory_pool();
+    seen_ = TypedBufferBuilder<bool>(pool_);
+    no_nulls_ = TypedBufferBuilder<bool>(pool_);
+    counts_ = TypedBufferBuilder<int64_t>(pool_);
     return Status::OK();
   }
 
   Status Resize(int64_t new_num_groups) override {
     auto added_groups = new_num_groups - num_groups_;
     num_groups_ = new_num_groups;
-    return seen_.Append(added_groups, false);
+    RETURN_NOT_OK(seen_.Append(added_groups, false));
+    RETURN_NOT_OK(no_nulls_.Append(added_groups, true));
+    return counts_.Append(added_groups, 0);
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    uint8_t* seen = seen_.mutable_data();
+    uint8_t* no_nulls = no_nulls_.mutable_data();
+    int64_t* counts = counts_.mutable_data();
+    const auto& input = *batch[0].array();
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    if (input.MayHaveNulls()) {
+      const uint8_t* bitmap = input.buffers[1]->data();
+      arrow::internal::VisitBitBlocksVoid(
+          input.buffers[0], input.offset, input.length,
+          [&](int64_t position) {
+            counts[*g]++;
+            BitUtil::SetBitTo(
+                seen, *g, BitUtil::GetBit(seen, *g) || BitUtil::GetBit(bitmap, 
position));
+            g++;
+          },
+          [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+    } else {
+      arrow::internal::VisitBitBlocksVoid(
+          input.buffers[1], input.offset, input.length,
+          [&](int64_t) {
+            counts[*g++]++;
+            BitUtil::SetBitTo(seen, *g++, true);
+          },
+          [&]() { counts[*g]++; });
+    }
+    return Status::OK();
   }
 
   Status Merge(GroupedAggregator&& raw_other,
                const ArrayData& group_id_mapping) override {
     auto other = checked_cast<GroupedAnyImpl*>(&raw_other);
 
-    auto seen = seen_.mutable_data();
-    auto other_seen = other->seen_.data();
+    uint8_t* seen = seen_.mutable_data();
+    uint8_t* no_nulls = no_nulls_.mutable_data();
+    int64_t* counts = counts_.mutable_data();
+
+    const uint8_t* other_seen = other->seen_.mutable_data();
+    const uint8_t* other_no_nulls = other->no_nulls_.mutable_data();
+    const int64_t* other_counts = other->counts_.mutable_data();
 
     auto g = group_id_mapping.GetValues<uint32_t>(1);
     for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, 
++g) {
-      if (BitUtil::GetBit(other_seen, other_g)) BitUtil::SetBitTo(seen, *g, 
true);
+      counts[*g] += other_counts[other_g];
+      BitUtil::SetBitTo(
+          seen, *g, BitUtil::GetBit(seen, *g) || BitUtil::GetBit(other_seen, 
other_g));
+      BitUtil::SetBitTo(
+          no_nulls, *g,
+          BitUtil::GetBit(no_nulls, *g) && BitUtil::GetBit(other_no_nulls, 
other_g));
     }
     return Status::OK();
   }
 
-  Status Consume(const ExecBatch& batch) override {
-    auto seen = seen_.mutable_data();
+  Result<Datum> Finalize() override {
+    std::shared_ptr<Buffer> null_bitmap;
+    const int64_t* counts = counts_.data();
+    int64_t null_count = 0;
 
-    const auto& input = *batch[0].array();
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] >= options_.min_count) continue;
 
-    auto g = batch[1].array()->GetValues<uint32_t>(1);
-    arrow::internal::VisitTwoBitBlocksVoid(
-        input.buffers[0], input.offset, input.buffers[1], input.offset, 
input.length,
-        [&](int64_t) { BitUtil::SetBitTo(seen, *g++, true); }, [&]() { g++; });
-    return Status::OK();
-  }
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
 
-  Result<Datum> Finalize() override {
     ARROW_ASSIGN_OR_RAISE(auto seen, seen_.Finish());
-    return std::make_shared<BooleanArray>(num_groups_, std::move(seen));
+    if (!options_.skip_nulls) {
+      null_count = kUnknownNullCount;
+      ARROW_ASSIGN_OR_RAISE(auto no_nulls, no_nulls_.Finish());
+      arrow::internal::BitmapOr(no_nulls->data(), /*left_offset=*/0, 
seen->data(),
+                                /*right_offset=*/0, num_groups_,
+                                /*out_offset=*/0, no_nulls->mutable_data());
+      if (null_bitmap) {
+        arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
+                                   no_nulls->data(), /*right_offset=*/0, 
num_groups_,
+                                   /*out_offset=*/0, 
null_bitmap->mutable_data());
+      } else {
+        null_bitmap = std::move(no_nulls);
+      }
+    }
+
+    return ArrayData::Make(out_type(), num_groups_,
+                           {std::move(null_bitmap), std::move(seen)}, 
null_count);
   }
 
   std::shared_ptr<DataType> out_type() const override { return boolean(); }
 
   int64_t num_groups_ = 0;
   ScalarAggregateOptions options_;
-  TypedBufferBuilder<bool> seen_;
+  TypedBufferBuilder<bool> seen_, no_nulls_;
+  TypedBufferBuilder<int64_t> counts_;
+  MemoryPool* pool_;
 };
 
 struct GroupedAllImpl : public GroupedAggregator {
-  Status Init(ExecContext* ctx, const FunctionOptions*) override {
-    seen_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = checked_cast<const ScalarAggregateOptions&>(*options);
+    pool_ = ctx->memory_pool();
+    seen_ = TypedBufferBuilder<bool>(pool_);
+    no_nulls_ = TypedBufferBuilder<bool>(pool_);
+    counts_ = TypedBufferBuilder<int64_t>(pool_);
     return Status::OK();
   }
 
   Status Resize(int64_t new_num_groups) override {
     auto added_groups = new_num_groups - num_groups_;
     num_groups_ = new_num_groups;
-    return seen_.Append(added_groups, true);
-  }
-
-  Status Merge(GroupedAggregator&& raw_other,
-               const ArrayData& group_id_mapping) override {
-    auto other = checked_cast<GroupedAllImpl*>(&raw_other);
-
-    auto seen = seen_.mutable_data();
-    auto other_seen = other->seen_.data();
-
-    auto g = group_id_mapping.GetValues<uint32_t>(1);
-    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, 
++g) {
-      BitUtil::SetBitTo(
-          seen, *g, BitUtil::GetBit(seen, *g) && BitUtil::GetBit(other_seen, 
other_g));
-    }
-    return Status::OK();
+    RETURN_NOT_OK(seen_.Append(added_groups, true));
+    RETURN_NOT_OK(no_nulls_.Append(added_groups, true));
+    return counts_.Append(added_groups, 0);
   }
 
   Status Consume(const ExecBatch& batch) override {
-    auto seen = seen_.mutable_data();
-
+    uint8_t* seen = seen_.mutable_data();
+    uint8_t* no_nulls = no_nulls_.mutable_data();
+    int64_t* counts = counts_.mutable_data();
     const auto& input = *batch[0].array();
-
     auto g = batch[1].array()->GetValues<uint32_t>(1);
+
     if (input.MayHaveNulls()) {
       const uint8_t* bitmap = input.buffers[1]->data();
       arrow::internal::VisitBitBlocksVoid(
           input.buffers[0], input.offset, input.length,
           [&](int64_t position) {
+            counts[*g]++;
             BitUtil::SetBitTo(seen, *g,
                               BitUtil::GetBit(seen, *g) &&
                                   BitUtil::GetBit(bitmap, input.offset + 
position));
             g++;
           },
-          [&]() { g++; });
+          [&]() { BitUtil::SetBitTo(no_nulls, *g++, false); });
     } else {
       arrow::internal::VisitBitBlocksVoid(
-          input.buffers[1], input.offset, input.length, [&](int64_t) { g++; },
-          [&]() { BitUtil::SetBitTo(seen, *g++, false); });
+          input.buffers[1], input.offset, input.length, [&](int64_t) { 
counts[*g++]++; },
+          [&]() {
+            counts[*g]++;
+            BitUtil::SetBitTo(seen, *g++, false);
+          });
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedAllImpl*>(&raw_other);
+
+    uint8_t* seen = seen_.mutable_data();
+    uint8_t* no_nulls = no_nulls_.mutable_data();
+    int64_t* counts = counts_.mutable_data();
+
+    const uint8_t* other_seen = other->seen_.mutable_data();
+    const uint8_t* other_no_nulls = other->no_nulls_.mutable_data();
+    const int64_t* other_counts = other->counts_.mutable_data();
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, 
++g) {
+      counts[*g] += other_counts[other_g];
+      BitUtil::SetBitTo(
+          seen, *g, BitUtil::GetBit(seen, *g) && BitUtil::GetBit(other_seen, 
other_g));
+      BitUtil::SetBitTo(
+          no_nulls, *g,
+          BitUtil::GetBit(no_nulls, *g) && BitUtil::GetBit(other_no_nulls, 
other_g));
     }
     return Status::OK();
   }
 
   Result<Datum> Finalize() override {
+    std::shared_ptr<Buffer> null_bitmap;
+    const int64_t* counts = counts_.data();
+    int64_t null_count = 0;
+
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] >= options_.min_count) continue;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
     ARROW_ASSIGN_OR_RAISE(auto seen, seen_.Finish());
-    return std::make_shared<BooleanArray>(num_groups_, std::move(seen));
+    if (!options_.skip_nulls) {
+      null_count = kUnknownNullCount;
+      ARROW_ASSIGN_OR_RAISE(auto no_nulls, no_nulls_.Finish());
+      arrow::internal::BitmapOrNot(no_nulls->data(), /*left_offset=*/0, 
seen->data(),
+                                   /*right_offset=*/0, num_groups_,
+                                   /*out_offset=*/0, no_nulls->mutable_data());
+      if (null_bitmap) {
+        arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
+                                   no_nulls->data(), /*right_offset=*/0, 
num_groups_,
+                                   /*out_offset=*/0, 
null_bitmap->mutable_data());
+      } else {
+        null_bitmap = std::move(no_nulls);
+      }
+    }
+
+    return ArrayData::Make(out_type(), num_groups_,
+                           {std::move(null_bitmap), std::move(seen)}, 
null_count);
   }
 
   std::shared_ptr<DataType> out_type() const override { return boolean(); }
 
   int64_t num_groups_ = 0;
   ScalarAggregateOptions options_;
-  TypedBufferBuilder<bool> seen_;
+  TypedBufferBuilder<bool> seen_, no_nulls_;
+  TypedBufferBuilder<int64_t> counts_;
+  MemoryPool* pool_;

Review comment:
       Similarly to the remark above about Sum / Mean / Product, I also wonder 
if Any and All can be reconciled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to