lidavidm commented on a change in pull request #10860:
URL: https://github.com/apache/arrow/pull/10860#discussion_r682554824
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1311,6 +1312,126 @@ struct GroupedVarStdFactory {
InputType argument_type;
};
+// ----------------------------------------------------------------------
+// TDigest implementation
+
+using arrow::internal::TDigest;
+
+template <typename Type>
+struct GroupedTDigestImpl : public GroupedAggregator {
+ using CType = typename Type::c_type;
+
+ Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ options_ = *checked_cast<const TDigestOptions*>(options);
+ ctx_ = ctx;
+ pool_ = ctx->memory_pool();
+ return Status::OK();
+ }
+
+ Status Resize(int64_t new_num_groups) override {
+ const int64_t added_groups = new_num_groups - tdigests_.size();
+ tdigests_.reserve(new_num_groups);
+ for (int64_t i = 0; i < added_groups; i++) {
+ tdigests_.emplace_back(options_.delta, options_.buffer_size);
+ }
+ return Status::OK();
+ }
+
+ Status Consume(const ExecBatch& batch) override {
+ auto g = batch[1].array()->GetValues<uint32_t>(1);
+ VisitArrayDataInline<Type>(
+ *batch[0].array(),
+ [&](typename TypeTraits<Type>::CType value) {
+ this->tdigests_[*g].NanAdd(value);
+ ++g;
+ },
+ [&] { ++g; });
+ return Status::OK();
+ }
+
+ Status Merge(GroupedAggregator&& raw_other,
+ const ArrayData& group_id_mapping) override {
+ auto other = checked_cast<GroupedTDigestImpl*>(&raw_other);
+
+ auto g = group_id_mapping.GetValues<uint32_t>(1);
+ std::vector<TDigest> other_tdigest(1);
+ for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g,
++g) {
+ other_tdigest[0] = std::move(other->tdigests_[other_g]);
+ tdigests_[*g].Merge(&other_tdigest);
Review comment:
It might be worth considering whether Merge should take an iterator pair
(and whether TDigest::Merge should also take an iterator pair or vector of
pointers instead of a vector of structs).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]