bkietz commented on a change in pull request #10876:
URL: https://github.com/apache/arrow/pull/10876#discussion_r691569047



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1062,6 +1062,184 @@ TEST(GroupBy, AnyAndAll) {
   }
 }
 
+TEST(GroupBy, CountDistinct) {
+  for (bool use_threads : {true, false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+    auto table =
+        TableFromJSON(schema({field("argument", float64()), field("key", 
int64())}), {R"([
+    [1,    1],
+    [1,    1]
+])",
+                                                                               
       R"([
+    [0,    2],
+    [null, 3]
+])",
+                                                                               
       R"([
+    [4,    null],
+    [1,    3]
+])",
+                                                                               
       R"([
+    [0,    2],
+    [-1,   2]
+])",
+                                                                               
       R"([
+    [1,    null],
+    [NaN,  3]
+  ])",
+                                                                               
       R"([
+    [2,    null],
+    [3,    null]
+  ])"});
+
+    ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                         internal::GroupBy(
+                             {
+                                 table->GetColumnByName("argument"),
+                             },
+                             {
+                                 table->GetColumnByName("key"),
+                             },
+                             {
+                                 {"hash_count_distinct", nullptr},
+                             },
+                             use_threads));
+    SortBy({"key_0"}, &aggregated_and_grouped);
+    ValidateOutput(aggregated_and_grouped);
+
+    AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("hash_count_distinct", int64()),
+                                        field("key_0", int64()),
+                                    }),
+                                    R"([
+    [1, 1],
+    [2, 2],
+    [3, 3],
+    [4, null]
+  ])"),
+                      aggregated_and_grouped,
+                      /*verbose=*/true);
+
+    table =
+        TableFromJSON(schema({field("argument", utf8()), field("key", 
int64())}), {R"([
+    ["foo",  1],
+    ["foo",  1]
+])",
+                                                                               
    R"([
+    ["bar",  2],
+    [null,   3]
+])",
+                                                                               
    R"([
+    ["baz",  null],
+    ["foo",  3]
+])",
+                                                                               
    R"([
+    ["bar",  2],
+    ["spam", 2]
+])",
+                                                                               
    R"([
+    ["eggs", null],
+    ["ham",  3]
+  ])",
+                                                                               
    R"([
+    ["a",    null],
+    ["b",    null]
+  ])"});
+
+    ASSERT_OK_AND_ASSIGN(aggregated_and_grouped,
+                         internal::GroupBy(
+                             {
+                                 table->GetColumnByName("argument"),
+                             },
+                             {
+                                 table->GetColumnByName("key"),
+                             },
+                             {
+                                 {"hash_count_distinct", nullptr},
+                             },
+                             use_threads));
+    ValidateOutput(aggregated_and_grouped);
+    SortBy({"key_0"}, &aggregated_and_grouped);
+
+    AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("hash_count_distinct", int64()),
+                                        field("key_0", int64()),
+                                    }),
+                                    R"([
+    [1, 1],
+    [2, 2],
+    [3, 3],
+    [4, null]
+  ])"),
+                      aggregated_and_grouped,
+                      /*verbose=*/true);
+  }
+}
+
+TEST(GroupBy, Distinct) {
+  for (bool use_threads : {true, false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+    auto table =
+        TableFromJSON(schema({field("argument", utf8()), field("key", 
int64())}), {R"([
+    ["foo",  1],
+    ["foo",  1]
+])",
+                                                                               
    R"([
+    ["bar",  2],
+    [null,   3]
+])",
+                                                                               
    R"([
+    ["baz",  null],
+    ["foo",  3]
+])",
+                                                                               
    R"([
+    ["bar",  2],
+    ["spam", 2]
+])",
+                                                                               
    R"([
+    ["eggs", null],
+    ["ham",  3]
+  ])",
+                                                                               
    R"([
+    ["a",    null],
+    ["b",    null]
+  ])"});
+
+    ASSERT_OK_AND_ASSIGN(auto aggregated_and_grouped,
+                         internal::GroupBy(
+                             {
+                                 table->GetColumnByName("argument"),
+                             },
+                             {
+                                 table->GetColumnByName("key"),
+                             },
+                             {
+                                 {"hash_distinct", nullptr},
+                             },
+                             use_threads));
+    ValidateOutput(aggregated_and_grouped);
+    SortBy({"key_0"}, &aggregated_and_grouped);
+
+    // Order of sub-arrays is not stable

Review comment:
       That's annoying.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1828,6 +1828,120 @@ struct GroupedAllImpl : public GroupedAggregator {
   TypedBufferBuilder<bool> seen_;
 };
 
+// ----------------------------------------------------------------------
+// CountDistinct/Distinct implementation
+
+struct GroupedCountDistinctImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    if (!grouper_) {
+      ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(batch.GetDescriptors(), 
ctx_));
+    }
+    return grouper_->Consume(batch).status();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedCountDistinctImpl*>(&raw_other);
+
+    // Get (group_id, value) pairs, then translate the group IDs and consume 
them
+    // ourselves
+    ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques());
+
+    const auto* g_mapping = group_id_mapping.GetValues<uint32_t>(1);
+    auto* other_g = uniques[1].array()->GetMutableValues<uint32_t>(1);
+    for (int64_t i = 0; i < uniques.length; i++) {
+      other_g[i] = g_mapping[other_g[i]];
+    }
+
+    return Consume(std::move(uniques));
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(int64_t), 
pool_));
+    int64_t* counts = reinterpret_cast<int64_t*>(values->mutable_data());
+    std::fill(counts, counts + num_groups_, 0);
+
+    ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
+    auto* g = uniques[1].array()->GetValues<uint32_t>(1);
+    for (int64_t i = 0; i < uniques.length; i++) {
+      counts[g[i]]++;
+    }
+
+    return ArrayData::Make(int64(), num_groups_, {nullptr, std::move(values)},
+                           /*null_count=*/0);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return int64(); }
+
+  ExecContext* ctx_;
+  MemoryPool* pool_;
+  int64_t num_groups_;
+  std::unique_ptr<Grouper> grouper_;
+};
+
+struct GroupedDistinctImpl : public GroupedCountDistinctImpl {
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
+
+    // Assemble the final list via Take
+
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> offsets_buffer,
+                          AllocateBuffer((num_groups_ + 1) * sizeof(int32_t), 
pool_));
+    int32_t* offsets = 
reinterpret_cast<int32_t*>(offsets_buffer->mutable_data());
+
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> take_indices,
+                          AllocateBuffer(uniques.length * sizeof(int64_t), 
pool_));
+    int32_t* indices = 
reinterpret_cast<int32_t*>(take_indices->mutable_data());
+
+    std::vector<std::vector<int32_t>> grouped_slices(num_groups_);
+
+    auto* g = uniques[1].array()->GetValues<uint32_t>(1);
+    for (int32_t i = 0; i < uniques.length; i++) {
+      grouped_slices[g[i]].push_back(i);
+    }
+
+    offsets[0] = 0;
+    for (size_t i = 0; i < grouped_slices.size(); i++) {
+      indices = std::copy(grouped_slices[i].begin(), grouped_slices[i].end(), 
indices);
+      offsets[i + 1] = offsets[i] + 
static_cast<int32_t>(grouped_slices[i].size());
+    }
+
+    ARROW_ASSIGN_OR_RAISE(

Review comment:
       This looks very similar to Grouper::MakeGroupings and 
Grouper::ApplyGroupings, any chance of reuse?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2257,6 +2381,21 @@ void RegisterHashAggregateBasic(FunctionRegistry* 
registry) {
     DCHECK_OK(func->AddKernel(MakeKernel(boolean(), 
HashAggregateInit<GroupedAllImpl>)));
     DCHECK_OK(registry->AddFunction(std::move(func)));
   }
+
+  {
+    auto func = std::make_shared<HashAggregateFunction>(
+        "hash_count_distinct", Arity::Binary(), &hash_count_distinct_doc);
+    DCHECK_OK(func->AddKernel(
+        MakeKernel(ValueDescr::ARRAY, 
HashAggregateInit<GroupedCountDistinctImpl>)));

Review comment:
       Nit: I'd like to avoid lazy initialization of the Grouper (just to keep 
it in `Kernel::init` because... that's the init method), should be doable by 
providing custom `init` for both of these

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1828,6 +1828,120 @@ struct GroupedAllImpl : public GroupedAggregator {
   TypedBufferBuilder<bool> seen_;
 };
 
+// ----------------------------------------------------------------------
+// CountDistinct/Distinct implementation
+
+struct GroupedCountDistinctImpl : public GroupedAggregator {
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;

Review comment:
       @michalursa what do you think of adding 
`Grouper::Reserve(additional_capacity_hint)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to