edponce commented on a change in pull request #11452:
URL: https://github.com/apache/arrow/pull/11452#discussion_r741473297



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -603,8 +603,9 @@ struct GroupedValueTraits<BooleanType> {
 };
 
 template <typename Type, typename ConsumeValue, typename ConsumeNull>
-void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
-                        ConsumeNull&& null_func) {
+typename arrow::internal::call_traits::enable_if_return<ConsumeValue, 
void>::type

Review comment:
       The `enable_if_return` only checks for `ConsumeValue` so `ConsumeNull` 
can be non-conformant, but ok. Obviously this is done deliberately to allow 
compile-time dispatching for the visitors. I think this falls under the "TODO: 
revise/generalize visitor patterns".

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public 
GroupedAggregator {
   ScalarAggregateOptions options_;
 };
 
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+                         enable_if_t<is_base_binary_type<Type>::value ||
+                                     std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+    // type_ initialized by MinMaxInit
+    has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    mins_.resize(new_num_groups);
+    maxes_.resize(new_num_groups);
+    RETURN_NOT_OK(has_values_.Append(added_groups, false));
+    RETURN_NOT_OK(has_nulls_.Append(added_groups, false));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, util::string_view val) {
+          if (!mins_[g] || val < util::string_view(*mins_[g])) {
+            if (!mins_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  mins_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(mins_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(mins_[g]->mutable_data(), val.data(), val.size());
+          }
+          if (!maxes_[g] || val > util::string_view(*maxes_[g])) {
+            if (!maxes_[g]) {
+              ARROW_ASSIGN_OR_RAISE(
+                  maxes_[g], AllocateResizableBuffer(val.size(), 
ctx_->memory_pool()));
+            }
+            RETURN_NOT_OK(maxes_[g]->Resize(val.size(), 
/*shrink_to_fit=*/false));
+            std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size());
+          }
+          BitUtil::SetBit(has_values_.mutable_data(), g);
+          return Status::OK();
+        },
+        [&](uint32_t g) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), g);
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other);
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < 
group_id_mapping.length;
+         ++other_g, ++g) {
+      if (!mins_[*g] ||
+          (mins_[*g] && other->mins_[other_g] &&
+           util::string_view(*mins_[*g]) > 
util::string_view(*other->mins_[other_g]))) {
+        mins_[*g] = std::move(other->mins_[other_g]);
+      }
+      if (!maxes_[*g] ||
+          (maxes_[*g] && other->maxes_[other_g] &&
+           util::string_view(*maxes_[*g]) < 
util::string_view(*other->maxes_[other_g]))) {
+        maxes_[*g] = std::move(other->maxes_[other_g]);
+      }
+
+      if (BitUtil::GetBit(other->has_values_.data(), other_g)) {
+        BitUtil::SetBit(has_values_.mutable_data(), *g);
+      }
+      if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) {
+        BitUtil::SetBit(has_nulls_.mutable_data(), *g);
+      }
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    // aggregation for group is valid if there was at least one value in that 
group
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish());
+
+    if (!options_.skip_nulls) {
+      // ... and there were no nulls in that group
+      ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish());
+      arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 
0,
+                                    num_groups_, 0, 
null_bitmap->mutable_data());
+    }
+
+    auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr});
+    auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), 
nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_));
+    RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_));
+    return ArrayData::Make(out_type(), num_groups_, {nullptr},
+                           {std::move(mins), std::move(maxes)});
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& 
values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), 
ctx_->memory_pool()));
+    offset_type* offsets = 
reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();
+    int64_t total_length = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (BitUtil::GetBit(null_bitmap, i)) {
+        total_length += values[i]->size();
+      }
+      if (total_length > std::numeric_limits<offset_type>::max()) {
+        return Status::Invalid("Result is too large to fit in ", *array->type,
+                               " cast to large_ variant of type");

Review comment:
       If `offset_type` is of type `int64_t` then this limit check will never 
trigger and will allow overflow:
   ```c++
   if (BitUtil::GetBit(null_bitmap, i)) {
       auto value_size = values[i]->size();
       if ((total_length > 0) && (value_size > 
(std::numeric_limits<offset_type>::max() - total_length)) {
           return Status::Invalid(...);
       }
       total_length += value_size;
   }
   ```
   
   Or use [safe-math.h 
utils](https://github.com/apache/arrow/blob/master/cpp/src/arrow/vendored/portable-snippets/safe-math.h).

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public 
GroupedAggregator {
   ScalarAggregateOptions options_;
 };
 
+// For binary-like types
+// In principle, FixedSizeBinary could use base implementation
+template <typename Type>
+struct GroupedMinMaxImpl<Type,
+                         enable_if_t<is_base_binary_type<Type>::value ||
+                                     std::is_same<Type, 
FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    options_ = *checked_cast<const ScalarAggregateOptions*>(options);
+    // type_ initialized by MinMaxInit
+    has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;

Review comment:
       If the `Resize()` operation reduces the number of groups, 
`std::vector<>.resize()` does not triggers an exception. [In `Notes` 
docs](https://en.cppreference.com/w/cpp/container/vector/resize) state that 
capacity is never reduced even if requested, so maybe add a 
`DCHECK_GT(added_groups, 0);`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to