aucahuasi commented on a change in pull request #11257:
URL: https://github.com/apache/arrow/pull/11257#discussion_r718511019



##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount) 
{
   }
 }
 
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    only_valid = CountOptions(CountOptions::ONLY_VALID);
+    only_null = CountOptions(CountOptions::ONLY_NULL);
+    all = CountOptions(CountOptions::ALL);
+  }
+
+  const Datum& expected(int64_t value) {
+    expected_values[value] = Datum(static_cast<int64_t>(value));
+    return expected_values.at(value);
+  }
+
+  CountOptions only_valid;
+  CountOptions only_null;
+  CountOptions all;
+
+ private:
+  std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+  auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";

Review comment:
       This PR has a test for non numeric types already, check the Python test.

##########
File path: python/pyarrow/tests/test_compute.py
##########
@@ -2217,3 +2217,12 @@ def test_list_element():
     result = pa.compute.list_element(lists, index)
     expected = pa.array([{'a': 5.6, 'b': 6}, {'a': .6, 'b': 8}], element_type)
     assert result.equals(expected)
+
+
+def test_count_distinct():
+    seed = datetime.now()
+    samples = [seed.replace(year=y) for y in range(1992, 2092)]
+    arr = pa.array(samples, pa.timestamp("ns"))
+    result = pa.compute.count_distinct(arr)
+    expected = pa.scalar(len(samples), type=pa.int64())

Review comment:
       Tests for count options are in the c++ side.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount) 
{
   }
 }
 
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    only_valid = CountOptions(CountOptions::ONLY_VALID);
+    only_null = CountOptions(CountOptions::ONLY_NULL);
+    all = CountOptions(CountOptions::ALL);
+  }
+
+  const Datum& expected(int64_t value) {
+    expected_values[value] = Datum(static_cast<int64_t>(value));
+    return expected_values.at(value);

Review comment:
       Haha thanks for spotting this, my intention was to avoid create many 
Datum instances, but I forgot to check the count of the key. I'll fix this.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Let me try to see if I can reduce the number of calls. I need the type 
for the Memo encoder.
   I tried to use some of the generators in codegen_internal but it seems we 
only have generators for the Exec call of kernels, not for the initialization 
call.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;

Review comment:
       I like the idea, but I think is better to have more count distinct 
implementations to define a better abstractions. I created a ticket for one 
based on HyperLogLog, perhaps we can do this for that PR.
   So I think for now the provided implementation in this PR is just fine.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };

Review comment:
       Yeah I was thinking about this as well. Would be great to have a visit 
call only for non nulls and if the array has null I would just add 1 to the 
output.
   Thanks for the feedback, let me try to improve this part.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };
+      auto visit_value = [&](typename Type::c_type arg) {
+        int y;
+        RETURN_NOT_OK(memo_table_->GetOrInsert(arg, &y));
+        return Status::OK();
+      };
+      RETURN_NOT_OK(VisitArrayDataInline<Type>(arr, visit_value, visit_null));
+      this->non_nulls += this->memo_table_->size();
+    } else {
+      const Scalar& input = *batch[0].scalar();
+      this->nulls += !input.is_valid * batch.length;
+      this->non_nulls += input.is_valid * batch.length;
+    }

Review comment:
       @edponce Can you tell me how to write a test for this case? I would like 
to fix this by using a C++ test, so I can understand better the error.
   btw thanks good catch!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };
+      auto visit_value = [&](typename Type::c_type arg) {
+        int y;

Review comment:
       I have been using that approach (using ...) in my other PRs too, I can 
change this to maintain the consistency. Thanks!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };
+      auto visit_value = [&](typename Type::c_type arg) {
+        int y;
+        RETURN_NOT_OK(memo_table_->GetOrInsert(arg, &y));
+        return Status::OK();
+      };
+      RETURN_NOT_OK(VisitArrayDataInline<Type>(arr, visit_value, visit_null));
+      this->non_nulls += this->memo_table_->size();
+    } else {
+      const Scalar& input = *batch[0].scalar();
+      this->nulls += !input.is_valid * batch.length;
+      this->non_nulls += input.is_valid * batch.length;
+    }
+    return Status::OK();
+  }
+
+  Status MergeFrom(KernelContext*, KernelState&& src) override {
+    const auto& other_state = checked_cast<const CountDistinctImpl&>(src);
+    this->non_nulls += other_state.non_nulls;
+    this->nulls += other_state.nulls;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) override {
+    const auto& state = checked_cast<const CountDistinctImpl&>(*ctx->state());
+    switch (state.options.mode) {
+      case CountOptions::ONLY_VALID:
+        *out = Datum(state.non_nulls);
+        break;
+      case CountOptions::ALL:
+        *out = Datum(state.non_nulls + state.nulls);
+        break;
+      case CountOptions::ONLY_NULL:
+        *out = Datum(state.nulls);
+        break;
+      default:
+        DCHECK(false) << "unreachable";
+    }
+    return Status::OK();
+  }
+
+  CountOptions options;
+  int64_t non_nulls = 0;

Review comment:
       Thanks, I did that for my first local version. I changed it to have more 
similar implementation to count, but I think it wasn't the best choice.
   Let me improve this part!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
+  aggregate::AddCountDistinctKernel<UInt8Type>(uint8(), func.get());
+  aggregate::AddCountDistinctKernel<UInt16Type>(uint16(), func.get());
+  aggregate::AddCountDistinctKernel<UInt32Type>(uint32(), func.get());
+  aggregate::AddCountDistinctKernel<UInt64Type>(uint64(), func.get());
+  aggregate::AddCountDistinctKernel<FloatType>(float32(), func.get());
+  aggregate::AddCountDistinctKernel<DoubleType>(float64(), func.get());
+  aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::SECOND), 
func.get());
+  aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::MILLI), 
func.get());
+  aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::MICRO), 
func.get());
+  aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::NANO), 
func.get());

Review comment:
       Thanks I will check that example and see if I can improve the code here.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount) 
{
   }
 }
 
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    only_valid = CountOptions(CountOptions::ONLY_VALID);
+    only_null = CountOptions(CountOptions::ONLY_NULL);
+    all = CountOptions(CountOptions::ALL);
+  }
+
+  const Datum& expected(int64_t value) {
+    expected_values[value] = Datum(static_cast<int64_t>(value));
+    return expected_values.at(value);
+  }
+
+  CountOptions only_valid;
+  CountOptions only_null;
+  CountOptions all;
+
+ private:
+  std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+  auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";
+  auto sample_nulls = "[null, 8, null, null, 6, null, 8]";
+  for (auto ty : NumericTypes()) {
+    auto input = ArrayFromJSON(ty, sample);
+    CheckScalar("count_distinct", {input}, expected(7), &only_valid);
+    CheckScalar("count_distinct", {input}, expected(0), &only_null);
+    CheckScalar("count_distinct", {input}, expected(7), &all);
+    auto input_nulls = ArrayFromJSON(ty, sample_nulls);
+    CheckScalar("count_distinct", {input_nulls}, expected(2), &only_valid);
+    CheckScalar("count_distinct", {input_nulls}, expected(1), &only_null);
+    CheckScalar("count_distinct", {input_nulls}, expected(3), &all);
+  }
+}
+
+TEST_F(TestCountDistinctKernel, RandomValidsStdMap) {
+  UInt32Builder builder;
+  std::map<uint32_t, int64_t> hashmap;

Review comment:
       Yes I was thinking about using a set too. That's better. Thanks!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };

Review comment:
       Ok now I understand better, but still I was thinking about these lines:
   
   1. The amount of code to make this "prettier" (using visitor, enable_if, 
hard-coded generators, etc) is going to result in more LoC and abstractions. 
Are there any reasons why we want to make this here? Perhaps I'm missing 
something.
   2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we 
will need to apply a casting every time the user wants to use a kernel. 
Explicit registration should result in faster kernel resolutions right?
   
   Let me know what you think here please!
   cc @edponce @pitrou @lidavidm 

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>> 
CountInit(KernelContext*,
       static_cast<const CountOptions&>(*args.options));
 }
 
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+  using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+  explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+      : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 
0)) {}
+
+  Status Consume(KernelContext*, const ExecBatch& batch) override {
+    if (batch[0].is_array()) {
+      const ArrayData& arr = *batch[0].array();
+      auto visit_null = [&]() {
+        if (this->nulls > 0) return Status::OK();
+        ++this->nulls;
+        return Status::OK();
+      };

Review comment:
       Ok now I understand better, but still I was thinking about these lines:
   
   1. The amount of code to make this "prettier" (using visitor, enable_if, 
hard-coded generators, etc) is going to result in more LoC and abstractions. 
Are there any reasons why do we want to make this here? Perhaps I'm missing 
something.
   2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we 
will need to apply a casting every time the user wants to use a kernel. 
Explicit registration should result in faster kernel resolutions right?
   
   Let me know what you think here please!
   cc @edponce @pitrou @lidavidm 

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Ok now I understand better, but still I was thinking about these lines:
   
   The amount of code to make this "prettier" (using visitor, enable_if, 
hard-coded generators, etc) is going to result in more LoC and abstractions. 
Are there any reasons why do we want to make this here? Perhaps I'm missing 
something.
   Also, if we rely in implicit cast: Doesn't that make it worse? Because we 
will need to apply a casting every time the user wants to use a kernel. 
Explicit registration should result in faster kernel resolutions right?
   
   Let me know what you think here please!
   cc @edponce @pitrou @lidavidm

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Ok now I understand better, but still I was thinking about these lines:
   
   1. The amount of code to make this "prettier" (using visitor, enable_if, 
hard-coded generators, etc) is going to result in more LoC and abstractions. 
Are there any reasons why do we want to make this here? Perhaps I'm missing 
something.
   2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we 
will need to apply a casting every time the user wants to use a kernel. 
Explicit registration should result in faster kernel resolutions right?
   
   Let me know what you think here please!
   cc @edponce @pitrou @lidavidm

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Ok now I understand better, but still I was thinking about these points:
   
   1. The amount of code to make this "prettier" (using visitor, enable_if, 
hard-coded generators, etc) is going to result in more LoC and abstractions. 
Are there any reasons why do we want to make this here? Perhaps I'm missing 
something.
   2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we 
will need to apply a casting every time the user wants to use a kernel. 
Explicit registration should result in faster kernel resolutions right?
   
   Let me know what do you think here please!
   cc @edponce @pitrou @lidavidm

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Sure, but given that some of these kernels don't support lists, structs, 
i.e. more complex types; I think a refactor should be take place in the future 
and having explicit code is better/clear instead of use more (temporary) 
abstractions/solutions.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       btw @pitrou I resolved the other conversation because this thread have 
more ideas.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       Sure, but given that some of these kernels don't support lists, structs, 
i.e. more complex types; I think a refactor should be take place in the future 
and having explicit code is better/clearer instead of use more (temporary) 
abstractions/solutions.

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       @lidavidm What types would I need to register if I want to use the 
`PhysicalType` snippet? I don't see how many lines of code I can cut off by 
using the `PhysicalType`. Thanks!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());

Review comment:
       > We'd still register the same types, however, we'd be instructing the 
compiler to instantiate the kernel less (since e.g. TimestampType and Int64Type 
would both delegate to CountDistinctImpl)
   
   I see thanks.
   
   > make sure things like zoned timestamps are properly supported
   
   How does the timezone gets encoded into the timestamp? is it part of its 
physical data layout? I think is a good idea to add a test for this case and 
for intervals as well ;)
   
   > You could also do something like 
AddCountDistinctKernel<Int64Type>(uint64()) which would collapse the signed and 
unsigned implementations for each bit width.
   
   Does this will use implicit casting for kernel resolution?
   
   btw I can use an approach like SumLike no problem (is not that complicated), 
my point is that we will have more lines of code/abstractions at the end and we 
will make a potential refactor later to support more types.
   
   > at the very least, it may be worth putting up a follow-up JIRA. (Also 
because I think there's other things we may want to do, e.g. we could perhaps 
support dictionaries.)
   
   Yes that is part of my point too. I like to have consistency as well, but I 
think most of this refactor should take place once we identify we need to add 
other types and not before, is better to have a generic code but not to create 
internal API when we don't know yet the requirements for each case: i.e. when 
in doubt leave it out ;)

##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry* 
registry) {
                aggregate::CountInit, func.get());
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
+  func = std::make_shared<ScalarAggregateFunction>(
+      "count_distinct", Arity::Unary(), &count_distinct_doc, 
&default_count_options);
+
+  // Takes any input, outputs int64 scalar
+  aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+  aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+  aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+  aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+  aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
+  aggregate::AddCountDistinctKernel<UInt8Type>(uint8(), func.get());
+  aggregate::AddCountDistinctKernel<UInt16Type>(uint16(), func.get());
+  aggregate::AddCountDistinctKernel<UInt32Type>(uint32(), func.get());
+  aggregate::AddCountDistinctKernel<UInt64Type>(uint64(), func.get());
+  aggregate::AddCountDistinctKernel<FloatType>(float32(), func.get());
+  aggregate::AddCountDistinctKernel<DoubleType>(float64(), func.get());
+  aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::SECOND), 
func.get());
+  aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::MILLI), 
func.get());
+  aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::MICRO), 
func.get());
+  aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::NANO), 
func.get());
+  for (auto u : TimeUnit::values()) {
+    aggregate::AddCountDistinctKernel<TimestampType>(timestamp(u), func.get());

Review comment:
       Thanks, good catch!

##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount) 
{
   }
 }
 
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    only_valid = CountOptions(CountOptions::ONLY_VALID);
+    only_null = CountOptions(CountOptions::ONLY_NULL);
+    all = CountOptions(CountOptions::ALL);
+  }
+
+  const Datum& expected(int64_t value) {
+    expected_values[value] = Datum(static_cast<int64_t>(value));
+    return expected_values.at(value);
+  }
+
+  CountOptions only_valid;
+  CountOptions only_null;
+  CountOptions all;
+
+ private:
+  std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+  auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";

Review comment:
       Got it, thanks! I will

##########
File path: python/pyarrow/tests/test_compute.py
##########
@@ -2217,3 +2217,12 @@ def test_list_element():
     result = pa.compute.list_element(lists, index)
     expected = pa.array([{'a': 5.6, 'b': 6}, {'a': .6, 'b': 8}], element_type)
     assert result.equals(expected)
+
+
+def test_count_distinct():
+    seed = datetime.now()
+    samples = [seed.replace(year=y) for y in range(1992, 2092)]
+    arr = pa.array(samples, pa.timestamp("ns"))
+    result = pa.compute.count_distinct(arr)
+    expected = pa.scalar(len(samples), type=pa.int64())

Review comment:
       Got it again! Thanks for the explanation @edponce ! I'll add a test for 
options here!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to