lidavidm commented on a change in pull request #10876:
URL: https://github.com/apache/arrow/pull/10876#discussion_r692480479



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1828,6 +1828,120 @@ struct GroupedAllImpl : public GroupedAggregator {
   TypedBufferBuilder<bool> seen_;
 };
 
+// ----------------------------------------------------------------------
+// CountDistinct/Distinct implementation
+
+struct GroupedCountDistinctImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    if (!grouper_) {
+      ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(batch.GetDescriptors(), 
ctx_));
+    }
+    return grouper_->Consume(batch).status();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedCountDistinctImpl*>(&raw_other);
+
+    // Get (group_id, value) pairs, then translate the group IDs and consume 
them
+    // ourselves
+    ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques());
+
+    const auto* g_mapping = group_id_mapping.GetValues<uint32_t>(1);
+    auto* other_g = uniques[1].array()->GetMutableValues<uint32_t>(1);
+    for (int64_t i = 0; i < uniques.length; i++) {
+      other_g[i] = g_mapping[other_g[i]];
+    }
+
+    return Consume(std::move(uniques));
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(int64_t), 
pool_));
+    int64_t* counts = reinterpret_cast<int64_t*>(values->mutable_data());
+    std::fill(counts, counts + num_groups_, 0);
+
+    ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
+    auto* g = uniques[1].array()->GetValues<uint32_t>(1);
+    for (int64_t i = 0; i < uniques.length; i++) {
+      counts[g[i]]++;
+    }
+
+    return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)},
+                           /*null_count=*/0);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return int64(); }
+
+  ExecContext* ctx_;
+  MemoryPool* pool_;
+  int64_t num_groups_;
+  std::unique_ptr<Grouper> grouper_;
+};
+
+struct GroupedDistinctImpl : public GroupedCountDistinctImpl {
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
+
+    // Assemble the final list via Take
+
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> offsets_buffer,
+                          AllocateBuffer((num_groups_ + 1) * sizeof(int32_t), 
pool_));
+    int32_t* offsets = 
reinterpret_cast<int32_t*>(offsets_buffer->mutable_data());
+
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> take_indices,
+                          AllocateBuffer(uniques.length * sizeof(int64_t), 
pool_));
+    int32_t* indices = 
reinterpret_cast<int32_t*>(take_indices->mutable_data());
+
+    std::vector<std::vector<int32_t>> grouped_slices(num_groups_);
+
+    auto* g = uniques[1].array()->GetValues<uint32_t>(1);
+    for (int32_t i = 0; i < uniques.length; i++) {
+      grouped_slices[g[i]].push_back(i);
+    }
+
+    offsets[0] = 0;
+    for (size_t i = 0; i < grouped_slices.size(); i++) {
+      indices = std::copy(grouped_slices[i].begin(), grouped_slices[i].end(), 
indices);
+      offsets[i + 1] = offsets[i] + 
static_cast<int32_t>(grouped_slices[i].size());
+    }
+
+    ARROW_ASSIGN_OR_RAISE(

Review comment:
       It turns out it's exactly the composition of the two, thanks for 
pointing that out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to