edponce commented on a change in pull request #11452: URL: https://github.com/apache/arrow/pull/11452#discussion_r741473297
########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -603,8 +603,9 @@ struct GroupedValueTraits<BooleanType> { }; template <typename Type, typename ConsumeValue, typename ConsumeNull> -void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func, - ConsumeNull&& null_func) { +typename arrow::internal::call_traits::enable_if_return<ConsumeValue, void>::type Review comment: The `enable_if_return` only checks for `ConsumeValue` so `ConsumeNull` can be non-conformant, but ok. Obviously this is done deliberately to allow compile-time dispatching for the visitors. I think this falls under the "TODO: revise/generalize visitor patterns". ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { ScalarAggregateOptions options_; }; +// For binary-like types +// In principle, FixedSizeBinary could use base implementation +template <typename Type> +struct GroupedMinMaxImpl<Type, + enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + options_ = *checked_cast<const ScalarAggregateOptions*>(options); + // type_ initialized by MinMaxInit + has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + mins_.resize(new_num_groups); + maxes_.resize(new_num_groups); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(has_nulls_.Append(added_groups, false)); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + return VisitGroupedValues<Type>( + batch, + [&](uint32_t g, util::string_view val) { + if (!mins_[g] || val < util::string_view(*mins_[g])) { + if (!mins_[g]) { + ARROW_ASSIGN_OR_RAISE( + mins_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(mins_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(mins_[g]->mutable_data(), val.data(), val.size()); + } + if (!maxes_[g] || val > util::string_view(*maxes_[g])) { + if (!maxes_[g]) { + ARROW_ASSIGN_OR_RAISE( + maxes_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(maxes_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size()); + } + BitUtil::SetBit(has_values_.mutable_data(), g); + return Status::OK(); + }, + [&](uint32_t g) { + BitUtil::SetBit(has_nulls_.mutable_data(), g); + return Status::OK(); + }); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other); + auto g = group_id_mapping.GetValues<uint32_t>(1); + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < group_id_mapping.length; + ++other_g, ++g) { + if (!mins_[*g] || + (mins_[*g] && other->mins_[other_g] && + util::string_view(*mins_[*g]) > util::string_view(*other->mins_[other_g]))) { + mins_[*g] = std::move(other->mins_[other_g]); + } + if (!maxes_[*g] || + (maxes_[*g] && other->maxes_[other_g] && + util::string_view(*maxes_[*g]) < util::string_view(*other->maxes_[other_g]))) { + maxes_[*g] = std::move(other->maxes_[other_g]); + } + + if (BitUtil::GetBit(other->has_values_.data(), other_g)) { + BitUtil::SetBit(has_values_.mutable_data(), *g); + } + if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) { + BitUtil::SetBit(has_nulls_.mutable_data(), *g); + } + } + return Status::OK(); + } + + Result<Datum> Finalize() override { + // aggregation for group is valid if there was at least one value in that group + ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish()); + + if (!options_.skip_nulls) { + // ... and there were no nulls in that group + ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish()); + arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 0, + num_groups_, 0, null_bitmap->mutable_data()); + } + + auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr}); + auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), nullptr}); + RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_)); + RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_)); + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(mins), std::move(maxes)}); + } + + template <typename T = Type> + enable_if_base_binary<T, Status> MakeOffsetsValues( + ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& values) { + using offset_type = typename T::offset_type; + ARROW_ASSIGN_OR_RAISE( + auto raw_offsets, + AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool())); + offset_type* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data()); + offsets[0] = 0; + offsets++; + const uint8_t* null_bitmap = array->buffers[0]->data(); + int64_t total_length = 0; + for (size_t i = 0; i < values.size(); i++) { + if (BitUtil::GetBit(null_bitmap, i)) { + total_length += values[i]->size(); + } + if (total_length > std::numeric_limits<offset_type>::max()) { + return Status::Invalid("Result is too large to fit in ", *array->type, + " cast to large_ variant of type"); Review comment: If `offset_type` is of type `int64_t` then this limit check will never trigger and will allow overflow: ```c++ if (BitUtil::GetBit(null_bitmap, i)) { auto value_size = values[i]->size(); if ((total_length > 0) && (value_size > (std::numeric_limits<offset_type>::max() - total_length)) { return Status::Invalid(...); } total_length += value_size; } ``` Or use [safe-math.h utils](https://github.com/apache/arrow/blob/master/cpp/src/arrow/vendored/portable-snippets/safe-math.h). ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { ScalarAggregateOptions options_; }; +// For binary-like types +// In principle, FixedSizeBinary could use base implementation +template <typename Type> +struct GroupedMinMaxImpl<Type, + enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + options_ = *checked_cast<const ScalarAggregateOptions*>(options); + // type_ initialized by MinMaxInit + has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; Review comment: If the `Resize()` operation reduces the number of groups, `std::vector<>.resize()` does not triggers an exception. [In `Notes` docs](https://en.cppreference.com/w/cpp/container/vector/resize) state that capacity is never reduced even if requested, so maybe add a `DCHECK_GT(added_groups, 0);`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org