lidavidm commented on a change in pull request #11452: URL: https://github.com/apache/arrow/pull/11452#discussion_r741528772
########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { ScalarAggregateOptions options_; }; +// For binary-like types +// In principle, FixedSizeBinary could use base implementation +template <typename Type> +struct GroupedMinMaxImpl<Type, + enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + options_ = *checked_cast<const ScalarAggregateOptions*>(options); + // type_ initialized by MinMaxInit + has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; Review comment: We don't actually care about reducing capacity, so long as the vector is at least big enough. (Also I don't think it's possible to reduce capacity; the number of groups can only ever increase.) ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { ScalarAggregateOptions options_; }; +// For binary-like types +// In principle, FixedSizeBinary could use base implementation +template <typename Type> +struct GroupedMinMaxImpl<Type, + enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + options_ = *checked_cast<const ScalarAggregateOptions*>(options); + // type_ initialized by MinMaxInit + has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + mins_.resize(new_num_groups); + maxes_.resize(new_num_groups); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(has_nulls_.Append(added_groups, false)); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + return VisitGroupedValues<Type>( + batch, + [&](uint32_t g, util::string_view val) { + if (!mins_[g] || val < util::string_view(*mins_[g])) { + if (!mins_[g]) { + ARROW_ASSIGN_OR_RAISE( + mins_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(mins_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(mins_[g]->mutable_data(), val.data(), val.size()); + } + if (!maxes_[g] || val > util::string_view(*maxes_[g])) { + if (!maxes_[g]) { + ARROW_ASSIGN_OR_RAISE( + maxes_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(maxes_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size()); + } + BitUtil::SetBit(has_values_.mutable_data(), g); + return Status::OK(); + }, + [&](uint32_t g) { + BitUtil::SetBit(has_nulls_.mutable_data(), g); + return Status::OK(); + }); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other); + auto g = group_id_mapping.GetValues<uint32_t>(1); + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < group_id_mapping.length; + ++other_g, ++g) { + if (!mins_[*g] || + (mins_[*g] && other->mins_[other_g] && + util::string_view(*mins_[*g]) > util::string_view(*other->mins_[other_g]))) { + mins_[*g] = std::move(other->mins_[other_g]); + } + if (!maxes_[*g] || + (maxes_[*g] && other->maxes_[other_g] && + util::string_view(*maxes_[*g]) < util::string_view(*other->maxes_[other_g]))) { + maxes_[*g] = std::move(other->maxes_[other_g]); + } + + if (BitUtil::GetBit(other->has_values_.data(), other_g)) { + BitUtil::SetBit(has_values_.mutable_data(), *g); + } + if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) { + BitUtil::SetBit(has_nulls_.mutable_data(), *g); + } + } + return Status::OK(); + } + + Result<Datum> Finalize() override { + // aggregation for group is valid if there was at least one value in that group + ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish()); + + if (!options_.skip_nulls) { + // ... and there were no nulls in that group + ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish()); + arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 0, + num_groups_, 0, null_bitmap->mutable_data()); + } + + auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr}); + auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), nullptr}); + RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_)); + RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_)); + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(mins), std::move(maxes)}); + } + + template <typename T = Type> + enable_if_base_binary<T, Status> MakeOffsetsValues( + ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& values) { + using offset_type = typename T::offset_type; + ARROW_ASSIGN_OR_RAISE( + auto raw_offsets, + AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool())); + offset_type* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data()); + offsets[0] = 0; + offsets++; + const uint8_t* null_bitmap = array->buffers[0]->data(); + int64_t total_length = 0; + for (size_t i = 0; i < values.size(); i++) { + if (BitUtil::GetBit(null_bitmap, i)) { + total_length += values[i]->size(); + } + if (total_length > std::numeric_limits<offset_type>::max()) { + return Status::Invalid("Result is too large to fit in ", *array->type, + " cast to large_ variant of type"); Review comment: Frankly we're awful about checking for overflow in general; most kernel code is like this and only checks for the case of overflowing int32_t. That said I'll add the check here. ########## File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc ########## @@ -1677,6 +1703,177 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { ScalarAggregateOptions options_; }; +// For binary-like types +// In principle, FixedSizeBinary could use base implementation +template <typename Type> +struct GroupedMinMaxImpl<Type, + enable_if_t<is_base_binary_type<Type>::value || + std::is_same<Type, FixedSizeBinaryType>::value>> + final : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { + ctx_ = ctx; + options_ = *checked_cast<const ScalarAggregateOptions*>(options); + // type_ initialized by MinMaxInit + has_values_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + has_nulls_ = TypedBufferBuilder<bool>(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + mins_.resize(new_num_groups); + maxes_.resize(new_num_groups); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(has_nulls_.Append(added_groups, false)); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + return VisitGroupedValues<Type>( + batch, + [&](uint32_t g, util::string_view val) { + if (!mins_[g] || val < util::string_view(*mins_[g])) { + if (!mins_[g]) { + ARROW_ASSIGN_OR_RAISE( + mins_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(mins_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(mins_[g]->mutable_data(), val.data(), val.size()); + } + if (!maxes_[g] || val > util::string_view(*maxes_[g])) { + if (!maxes_[g]) { + ARROW_ASSIGN_OR_RAISE( + maxes_[g], AllocateResizableBuffer(val.size(), ctx_->memory_pool())); + } + RETURN_NOT_OK(maxes_[g]->Resize(val.size(), /*shrink_to_fit=*/false)); + std::memcpy(maxes_[g]->mutable_data(), val.data(), val.size()); + } + BitUtil::SetBit(has_values_.mutable_data(), g); + return Status::OK(); + }, + [&](uint32_t g) { + BitUtil::SetBit(has_nulls_.mutable_data(), g); + return Status::OK(); + }); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast<GroupedMinMaxImpl*>(&raw_other); + auto g = group_id_mapping.GetValues<uint32_t>(1); + for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < group_id_mapping.length; + ++other_g, ++g) { + if (!mins_[*g] || + (mins_[*g] && other->mins_[other_g] && + util::string_view(*mins_[*g]) > util::string_view(*other->mins_[other_g]))) { + mins_[*g] = std::move(other->mins_[other_g]); + } + if (!maxes_[*g] || + (maxes_[*g] && other->maxes_[other_g] && + util::string_view(*maxes_[*g]) < util::string_view(*other->maxes_[other_g]))) { + maxes_[*g] = std::move(other->maxes_[other_g]); + } + + if (BitUtil::GetBit(other->has_values_.data(), other_g)) { + BitUtil::SetBit(has_values_.mutable_data(), *g); + } + if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) { + BitUtil::SetBit(has_nulls_.mutable_data(), *g); + } + } + return Status::OK(); + } + + Result<Datum> Finalize() override { + // aggregation for group is valid if there was at least one value in that group + ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish()); + + if (!options_.skip_nulls) { + // ... and there were no nulls in that group + ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish()); + arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 0, + num_groups_, 0, null_bitmap->mutable_data()); + } + + auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr}); + auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), nullptr}); + RETURN_NOT_OK(MakeOffsetsValues(mins.get(), mins_)); + RETURN_NOT_OK(MakeOffsetsValues(maxes.get(), maxes_)); + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(mins), std::move(maxes)}); + } + + template <typename T = Type> + enable_if_base_binary<T, Status> MakeOffsetsValues( + ArrayData* array, const std::vector<std::shared_ptr<ResizableBuffer>>& values) { + using offset_type = typename T::offset_type; + ARROW_ASSIGN_OR_RAISE( + auto raw_offsets, + AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool())); + offset_type* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data()); + offsets[0] = 0; + offsets++; + const uint8_t* null_bitmap = array->buffers[0]->data(); + int64_t total_length = 0; + for (size_t i = 0; i < values.size(); i++) { + if (BitUtil::GetBit(null_bitmap, i)) { + total_length += values[i]->size(); + } + if (total_length > std::numeric_limits<offset_type>::max()) { + return Status::Invalid("Result is too large to fit in ", *array->type, + " cast to large_ variant of type"); Review comment: Added an overflow check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org