lidavidm commented on a change in pull request #11452:
URL: https://github.com/apache/arrow/pull/11452#discussion_r741528772



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public 
GroupedAggregator {
   ScalarAggregateOptions options_;
 };
 
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+                         enable_if_t<is_base_binary_type<Type>::value ||
+                                     std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+    // type_ initialized by MinMaxInit
+    has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;

Review comment:
       We don't actually care about reducing capacity, so long as the vector is 
at least big enough. (Also I don't think it's possible to reduce capacity; the 
number of groups can only ever increase.)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public 
GroupedAggregator {
   ScalarAggregateOptions options_;
 };
 
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+                         enable_if_t<is_base_binary_type<Type>::value ||
+                                     std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+    // type_ initialized by MinMaxInit
+    has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    mins_.resize(new_num_groups);
+    maxes_.resize(new_num_groups);
+    RETURN_NOT_OK(has_values_.Append(added_groups, false));
+    RETURN_NOT_OK(has_nulls_.Append(added_groups, false));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, util::string_view val) {
+          if (!mins_[g] || val < util::string_view(*mins_[g])) {
+            if (!mins_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  mins_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(mins_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(mins_[g]->mutable_data(), val.data(), val.size());
+          }
+          if (!maxes_[g] || val > util::string_view(*maxes_[g])) {
+            if (!maxes_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  maxes_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(maxes_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size());
+          }
+          BitUtil::SetBit(has_values_.mutable_data(), g);
+          return Status::OK();
+        },
+        [&](uint32_t g) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), g);
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other);
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
group_id_mapping.length;
+         ++other_g, ++g) {
+      if (!mins_[*g] ||
+          (mins_[*g] && other->mins_[other_g] &&
+           util::string_view(*mins_[*g]) > 
util::string_view(*other->mins_[other_g]))) {
+        mins_[*g] = std::move(other->mins_[other_g]);
+      }
+      if (!maxes_[*g] ||
+          (maxes_[*g] && other->maxes_[other_g] &&
+           util::string_view(*maxes_[*g]) < 
util::string_view(*other->maxes_[other_g]))) {
+        maxes_[*g] = std::move(other->maxes_[other_g]);
+      }
+
+      if (BitUtil::GetBit(other->has_values_.data(), other_g)) {
+        BitUtil::SetBit(has_values_.mutable_data(), *g);
+      }
+      if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) {
+        BitUtil::SetBit(has_nulls_.mutable_data(), *g);
+      }
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    // aggregation for group is valid if there was at least one value in that 
group
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish());
+
+    if (!options_.skip_nulls) {
+      // ... and there were no nulls in that group
+      ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish());
+      arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 
0,
+                                    num_groups_, 0, 
null_bitmap->mutable_data());
+    }
+
+    auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr});
+    auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), 
nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_));
+    RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_));
+    return ArrayData::Make(out_type(), num_groups_, {nullptr},
+                           {std::move(mins), std::move(maxes)});
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& 
values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), 
ctx_->memory_pool()));
+    offset_type* offsets = 
reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();
+    int64_t total_length = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (BitUtil::GetBit(null_bitmap, i)) {
+        total_length += values[i]->size();
+      }
+      if (total_length > std::numeric_limits<offset_type>::max()) {
+        return Status::Invalid("Result is too large to fit in ", *array->type,
+                               " cast to large_ variant of type");

Review comment:
       Frankly we're awful about checking for overflow in general; most kernel 
code is like this and only checks for the case of overflowing int32_t. That 
said I'll add the check here.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public 
GroupedAggregator {
   ScalarAggregateOptions options_;
 };
 
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+                         enable_if_t<is_base_binary_type<Type>::value ||
+                                     std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+    // type_ initialized by MinMaxInit
+    has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    mins_.resize(new_num_groups);
+    maxes_.resize(new_num_groups);
+    RETURN_NOT_OK(has_values_.Append(added_groups, false));
+    RETURN_NOT_OK(has_nulls_.Append(added_groups, false));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, util::string_view val) {
+          if (!mins_[g] || val < util::string_view(*mins_[g])) {
+            if (!mins_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  mins_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(mins_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(mins_[g]->mutable_data(), val.data(), val.size());
+          }
+          if (!maxes_[g] || val > util::string_view(*maxes_[g])) {
+            if (!maxes_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  maxes_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(maxes_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size());
+          }
+          BitUtil::SetBit(has_values_.mutable_data(), g);
+          return Status::OK();
+        },
+        [&](uint32_t g) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), g);
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other);
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
group_id_mapping.length;
+         ++other_g, ++g) {
+      if (!mins_[*g] ||
+          (mins_[*g] && other->mins_[other_g] &&
+           util::string_view(*mins_[*g]) > 
util::string_view(*other->mins_[other_g]))) {
+        mins_[*g] = std::move(other->mins_[other_g]);
+      }
+      if (!maxes_[*g] ||
+          (maxes_[*g] && other->maxes_[other_g] &&
+           util::string_view(*maxes_[*g]) < 
util::string_view(*other->maxes_[other_g]))) {
+        maxes_[*g] = std::move(other->maxes_[other_g]);
+      }
+
+      if (BitUtil::GetBit(other->has_values_.data(), other_g)) {
+        BitUtil::SetBit(has_values_.mutable_data(), *g);
+      }
+      if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) {
+        BitUtil::SetBit(has_nulls_.mutable_data(), *g);
+      }
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    // aggregation for group is valid if there was at least one value in that 
group
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish());
+
+    if (!options_.skip_nulls) {
+      // ... and there were no nulls in that group
+      ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish());
+      arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 
0,
+                                    num_groups_, 0, 
null_bitmap->mutable_data());
+    }
+
+    auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr});
+    auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), 
nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_));
+    RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_));
+    return ArrayData::Make(out_type(), num_groups_, {nullptr},
+                           {std::move(mins), std::move(maxes)});
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& 
values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), 
ctx_->memory_pool()));
+    offset_type* offsets = 
reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();
+    int64_t total_length = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (BitUtil::GetBit(null_bitmap, i)) {
+        total_length += values[i]->size();
+      }
+      if (total_length > std::numeric_limits<offset_type>::max()) {
+        return Status::Invalid("Result is too large to fit in ", *array->type,
+                               " cast to large_ variant of type");

Review comment:
       Added an overflow check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to