aucahuasi commented on a change in pull request #11257:
URL: https://github.com/apache/arrow/pull/11257#discussion_r718511019
##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount)
{
}
}
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ only_valid = CountOptions(CountOptions::ONLY_VALID);
+ only_null = CountOptions(CountOptions::ONLY_NULL);
+ all = CountOptions(CountOptions::ALL);
+ }
+
+ const Datum& expected(int64_t value) {
+ expected_values[value] = Datum(static_cast<int64_t>(value));
+ return expected_values.at(value);
+ }
+
+ CountOptions only_valid;
+ CountOptions only_null;
+ CountOptions all;
+
+ private:
+ std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+ auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";
Review comment:
This PR has a test for non numeric types already, check the Python test.
##########
File path: python/pyarrow/tests/test_compute.py
##########
@@ -2217,3 +2217,12 @@ def test_list_element():
result = pa.compute.list_element(lists, index)
expected = pa.array([{'a': 5.6, 'b': 6}, {'a': .6, 'b': 8}], element_type)
assert result.equals(expected)
+
+
+def test_count_distinct():
+ seed = datetime.now()
+ samples = [seed.replace(year=y) for y in range(1992, 2092)]
+ arr = pa.array(samples, pa.timestamp("ns"))
+ result = pa.compute.count_distinct(arr)
+ expected = pa.scalar(len(samples), type=pa.int64())
Review comment:
Tests for count options are in the c++ side.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount)
{
}
}
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ only_valid = CountOptions(CountOptions::ONLY_VALID);
+ only_null = CountOptions(CountOptions::ONLY_NULL);
+ all = CountOptions(CountOptions::ALL);
+ }
+
+ const Datum& expected(int64_t value) {
+ expected_values[value] = Datum(static_cast<int64_t>(value));
+ return expected_values.at(value);
Review comment:
Haha thanks for spotting this, my intention was to avoid create many
Datum instances, but I forgot to check the count of the key. I'll fix this.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Let me try to see if I can reduce the number of calls. I need the type
for the Memo encoder.
I tried to use some of the generators in codegen_internal but it seems we
only have generators for the Exec call of kernels, not for the initialization
call.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
Review comment:
I like the idea, but I think is better to have more count distinct
implementations to define a better abstractions. I created a ticket for one
based on HyperLogLog, perhaps we can do this for that PR.
So I think for now the provided implementation in this PR is just fine.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
Review comment:
Yeah I was thinking about this as well. Would be great to have a visit
call only for non nulls and if the array has null I would just add 1 to the
output.
Thanks for the feedback, let me try to improve this part.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
+ auto visit_value = [&](typename Type::c_type arg) {
+ int y;
+ RETURN_NOT_OK(memo_table_->GetOrInsert(arg, &y));
+ return Status::OK();
+ };
+ RETURN_NOT_OK(VisitArrayDataInline<Type>(arr, visit_value, visit_null));
+ this->non_nulls += this->memo_table_->size();
+ } else {
+ const Scalar& input = *batch[0].scalar();
+ this->nulls += !input.is_valid * batch.length;
+ this->non_nulls += input.is_valid * batch.length;
+ }
Review comment:
@edponce Can you tell me how to write a test for this case? I would like
to fix this by using a C++ test, so I can understand better the error.
btw thanks good catch!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
+ auto visit_value = [&](typename Type::c_type arg) {
+ int y;
Review comment:
I have been using that approach (using ...) in my other PRs too, I can
change this to maintain the consistency. Thanks!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
+ auto visit_value = [&](typename Type::c_type arg) {
+ int y;
+ RETURN_NOT_OK(memo_table_->GetOrInsert(arg, &y));
+ return Status::OK();
+ };
+ RETURN_NOT_OK(VisitArrayDataInline<Type>(arr, visit_value, visit_null));
+ this->non_nulls += this->memo_table_->size();
+ } else {
+ const Scalar& input = *batch[0].scalar();
+ this->nulls += !input.is_valid * batch.length;
+ this->non_nulls += input.is_valid * batch.length;
+ }
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext*, KernelState&& src) override {
+ const auto& other_state = checked_cast<const CountDistinctImpl&>(src);
+ this->non_nulls += other_state.non_nulls;
+ this->nulls += other_state.nulls;
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext* ctx, Datum* out) override {
+ const auto& state = checked_cast<const CountDistinctImpl&>(*ctx->state());
+ switch (state.options.mode) {
+ case CountOptions::ONLY_VALID:
+ *out = Datum(state.non_nulls);
+ break;
+ case CountOptions::ALL:
+ *out = Datum(state.non_nulls + state.nulls);
+ break;
+ case CountOptions::ONLY_NULL:
+ *out = Datum(state.nulls);
+ break;
+ default:
+ DCHECK(false) << "unreachable";
+ }
+ return Status::OK();
+ }
+
+ CountOptions options;
+ int64_t non_nulls = 0;
Review comment:
Thanks, I did that for my first local version. I changed it to have more
similar implementation to count, but I think it wasn't the best choice.
Let me improve this part!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
+ aggregate::AddCountDistinctKernel<UInt8Type>(uint8(), func.get());
+ aggregate::AddCountDistinctKernel<UInt16Type>(uint16(), func.get());
+ aggregate::AddCountDistinctKernel<UInt32Type>(uint32(), func.get());
+ aggregate::AddCountDistinctKernel<UInt64Type>(uint64(), func.get());
+ aggregate::AddCountDistinctKernel<FloatType>(float32(), func.get());
+ aggregate::AddCountDistinctKernel<DoubleType>(float64(), func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::SECOND),
func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::MILLI),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::MICRO),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::NANO),
func.get());
Review comment:
Thanks I will check that example and see if I can improve the code here.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount)
{
}
}
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ only_valid = CountOptions(CountOptions::ONLY_VALID);
+ only_null = CountOptions(CountOptions::ONLY_NULL);
+ all = CountOptions(CountOptions::ALL);
+ }
+
+ const Datum& expected(int64_t value) {
+ expected_values[value] = Datum(static_cast<int64_t>(value));
+ return expected_values.at(value);
+ }
+
+ CountOptions only_valid;
+ CountOptions only_null;
+ CountOptions all;
+
+ private:
+ std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+ auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";
+ auto sample_nulls = "[null, 8, null, null, 6, null, 8]";
+ for (auto ty : NumericTypes()) {
+ auto input = ArrayFromJSON(ty, sample);
+ CheckScalar("count_distinct", {input}, expected(7), &only_valid);
+ CheckScalar("count_distinct", {input}, expected(0), &only_null);
+ CheckScalar("count_distinct", {input}, expected(7), &all);
+ auto input_nulls = ArrayFromJSON(ty, sample_nulls);
+ CheckScalar("count_distinct", {input_nulls}, expected(2), &only_valid);
+ CheckScalar("count_distinct", {input_nulls}, expected(1), &only_null);
+ CheckScalar("count_distinct", {input_nulls}, expected(3), &all);
+ }
+}
+
+TEST_F(TestCountDistinctKernel, RandomValidsStdMap) {
+ UInt32Builder builder;
+ std::map<uint32_t, int64_t> hashmap;
Review comment:
Yes I was thinking about using a set too. That's better. Thanks!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
Review comment:
Ok now I understand better, but still I was thinking about these lines:
1. The amount of code to make this "prettier" (using visitor, enable_if,
hard-coded generators, etc) is going to result in more LoC and abstractions.
Are there any reasons why we want to make this here? Perhaps I'm missing
something.
2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we
will need to apply a casting every time the user wants to use a kernel.
Explicit registration should result in faster kernel resolutions right?
Let me know what you think here please!
cc @edponce @pitrou @lidavidm
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -121,6 +122,84 @@ Result<std::unique_ptr<KernelState>>
CountInit(KernelContext*,
static_cast<const CountOptions&>(*args.options));
}
+// ----------------------------------------------------------------------
+// Distinct Count implementation
+
+template <typename Type>
+struct CountDistinctImpl : public ScalarAggregator {
+ using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;
+
+ explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
+ : options(std::move(options)), memo_table_(new MemoTable(memory_pool,
0)) {}
+
+ Status Consume(KernelContext*, const ExecBatch& batch) override {
+ if (batch[0].is_array()) {
+ const ArrayData& arr = *batch[0].array();
+ auto visit_null = [&]() {
+ if (this->nulls > 0) return Status::OK();
+ ++this->nulls;
+ return Status::OK();
+ };
Review comment:
Ok now I understand better, but still I was thinking about these lines:
1. The amount of code to make this "prettier" (using visitor, enable_if,
hard-coded generators, etc) is going to result in more LoC and abstractions.
Are there any reasons why do we want to make this here? Perhaps I'm missing
something.
2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we
will need to apply a casting every time the user wants to use a kernel.
Explicit registration should result in faster kernel resolutions right?
Let me know what you think here please!
cc @edponce @pitrou @lidavidm
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Ok now I understand better, but still I was thinking about these lines:
The amount of code to make this "prettier" (using visitor, enable_if,
hard-coded generators, etc) is going to result in more LoC and abstractions.
Are there any reasons why do we want to make this here? Perhaps I'm missing
something.
Also, if we rely in implicit cast: Doesn't that make it worse? Because we
will need to apply a casting every time the user wants to use a kernel.
Explicit registration should result in faster kernel resolutions right?
Let me know what you think here please!
cc @edponce @pitrou @lidavidm
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Ok now I understand better, but still I was thinking about these lines:
1. The amount of code to make this "prettier" (using visitor, enable_if,
hard-coded generators, etc) is going to result in more LoC and abstractions.
Are there any reasons why do we want to make this here? Perhaps I'm missing
something.
2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we
will need to apply a casting every time the user wants to use a kernel.
Explicit registration should result in faster kernel resolutions right?
Let me know what you think here please!
cc @edponce @pitrou @lidavidm
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Ok now I understand better, but still I was thinking about these points:
1. The amount of code to make this "prettier" (using visitor, enable_if,
hard-coded generators, etc) is going to result in more LoC and abstractions.
Are there any reasons why do we want to make this here? Perhaps I'm missing
something.
2. Also, if we rely in implicit cast: Doesn't that make it worse? Because we
will need to apply a casting every time the user wants to use a kernel.
Explicit registration should result in faster kernel resolutions right?
Let me know what do you think here please!
cc @edponce @pitrou @lidavidm
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Sure, but given that some of these kernels don't support lists, structs,
i.e. more complex types; I think a refactor should be take place in the future
and having explicit code is better/clear instead of use more (temporary)
abstractions/solutions.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
btw @pitrou I resolved the other conversation because this thread have
more ideas.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Sure, but given that some of these kernels don't support lists, structs,
i.e. more complex types; I think a refactor should be take place in the future
and having explicit code is better/clearer instead of use more (temporary)
abstractions/solutions.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
@lidavidm What types would I need to register if I want to use the
`PhysicalType` snippet? I don't see how many lines of code I can cut off by
using the `PhysicalType`. Thanks!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
> We'd still register the same types, however, we'd be instructing the
compiler to instantiate the kernel less (since e.g. TimestampType and Int64Type
would both delegate to CountDistinctImpl)
I see thanks.
> make sure things like zoned timestamps are properly supported
How does the timezone gets encoded into the timestamp? is it part of its
physical data layout? I think is a good idea to add a test for this case and
for intervals as well ;)
> You could also do something like
AddCountDistinctKernel<Int64Type>(uint64()) which would collapse the signed and
unsigned implementations for each bit width.
Does this will use implicit casting for kernel resolution?
btw I can use an approach like SumLike no problem (is not that complicated),
my point is that we will have more lines of code/abstractions at the end and we
will make a potential refactor later to support more types.
> at the very least, it may be worth putting up a follow-up JIRA. (Also
because I think there's other things we may want to do, e.g. we could perhaps
support dictionaries.)
Yes that is part of my point too. I like to have consistency as well, but I
think most of this refactor should take place once we identify we need to add
other types and not before, is better to have a generic code but not to create
internal API when we don't know yet the requirements for each case: i.e. when
in doubt leave it out ;)
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
+ aggregate::AddCountDistinctKernel<UInt8Type>(uint8(), func.get());
+ aggregate::AddCountDistinctKernel<UInt16Type>(uint16(), func.get());
+ aggregate::AddCountDistinctKernel<UInt32Type>(uint32(), func.get());
+ aggregate::AddCountDistinctKernel<UInt64Type>(uint64(), func.get());
+ aggregate::AddCountDistinctKernel<FloatType>(float32(), func.get());
+ aggregate::AddCountDistinctKernel<DoubleType>(float64(), func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::SECOND),
func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::MILLI),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::MICRO),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::NANO),
func.get());
+ for (auto u : TimeUnit::values()) {
+ aggregate::AddCountDistinctKernel<TimestampType>(timestamp(u), func.get());
Review comment:
Thanks, good catch!
##########
File path: cpp/src/arrow/compute/kernels/aggregate_test.cc
##########
@@ -873,6 +873,65 @@ TYPED_TEST(TestRandomNumericCountKernel, RandomArrayCount)
{
}
}
+//
+// Count Distinct
+//
+
+class TestCountDistinctKernel : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ only_valid = CountOptions(CountOptions::ONLY_VALID);
+ only_null = CountOptions(CountOptions::ONLY_NULL);
+ all = CountOptions(CountOptions::ALL);
+ }
+
+ const Datum& expected(int64_t value) {
+ expected_values[value] = Datum(static_cast<int64_t>(value));
+ return expected_values.at(value);
+ }
+
+ CountOptions only_valid;
+ CountOptions only_null;
+ CountOptions all;
+
+ private:
+ std::map<int64_t, Datum> expected_values;
+};
+
+TEST_F(TestCountDistinctKernel, NumericArrowTypesWithNulls) {
+ auto sample = "[1, 1, 2, 2, 5, 8, 9, 9, 9, 10, 6, 6]";
Review comment:
Got it, thanks! I will
##########
File path: python/pyarrow/tests/test_compute.py
##########
@@ -2217,3 +2217,12 @@ def test_list_element():
result = pa.compute.list_element(lists, index)
expected = pa.array([{'a': 5.6, 'b': 6}, {'a': .6, 'b': 8}], element_type)
assert result.equals(expected)
+
+
+def test_count_distinct():
+ seed = datetime.now()
+ samples = [seed.replace(year=y) for y in range(1992, 2092)]
+ arr = pa.array(samples, pa.timestamp("ns"))
+ result = pa.compute.count_distinct(arr)
+ expected = pa.scalar(len(samples), type=pa.int64())
Review comment:
Got it again! Thanks for the explanation @edponce ! I'll add a test for
options here!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]