lidavidm commented on a change in pull request #11257:
URL: https://github.com/apache/arrow/pull/11257#discussion_r718585843
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
You could iterate through IntTypes or PrimitiveTypes or something, and
just have a switch-case (essentially a hardcoded generator) on the type ID. Or
use VisitTypeInline with some enable_if trickery.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
I agree we probably don't need implicit casting. I think Antoine's
suggestion is best. That would look like this (untested):
```cpp
template <typename Type>
void AddCountDistinctKernel(std::shared_ptr<DataType> type,
ScalarAggregateFunction* func) {
using PhysicalType = typename Type::PhysicalType;
AddAggKernel(KernelSignature::Make({type}, ValueDescr::Scalar(int64())),
aggregate::CountDistinctInit<PhysicalType>, func);
}
```
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
That said elsewhere Antoine points out SumLikeInit - something like that
is probably ideal and I don't think it is appreciably harder to read (at least
given that other kernels use similar infrastructure).
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
+ aggregate::AddCountDistinctKernel<UInt8Type>(uint8(), func.get());
+ aggregate::AddCountDistinctKernel<UInt16Type>(uint16(), func.get());
+ aggregate::AddCountDistinctKernel<UInt32Type>(uint32(), func.get());
+ aggregate::AddCountDistinctKernel<UInt64Type>(uint64(), func.get());
+ aggregate::AddCountDistinctKernel<FloatType>(float32(), func.get());
+ aggregate::AddCountDistinctKernel<DoubleType>(float64(), func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::SECOND),
func.get());
+ aggregate::AddCountDistinctKernel<Time32Type>(time32(TimeUnit::MILLI),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::MICRO),
func.get());
+ aggregate::AddCountDistinctKernel<Time64Type>(time64(TimeUnit::NANO),
func.get());
+ for (auto u : TimeUnit::values()) {
+ aggregate::AddCountDistinctKernel<TimestampType>(timestamp(u), func.get());
Review comment:
This should probably use
[TimestampTypeUnit](https://github.com/apache/arrow/blob/8d2cda61427af350a1ff7e6abdd07515a5ed094d/cpp/src/arrow/compute/kernel.h#L123)
or else zoned timestamps will get inadvertently rejected.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
Sure, that's reasonable. I think the key points here are just to not
generate distinct kernels where it's not needed (e.g. Timestamp should use the
same underlying impl as Int64, and frankly, Int64/UInt64 can/should be
consolidated, right?) and to make sure things like zoned timestamps are
properly supported. That said, consistency with the rest of the kernels is
nice, even if the codebase is not perfect about being consistent throughout
(and Eduardo has been identifying/filing cases like this) - at the very least,
it may be worth putting up a follow-up JIRA. (Also because I think there's
other things we may want to do, e.g. we could perhaps support dictionaries.)
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
We'd still register the same types, however, we'd be instructing the
compiler to instantiate the kernel less (since e.g. TimestampType and Int64Type
would both delegate to CountDistinctImpl<Int64Type>). You could also do
something like `AddCountDistinctKernel<Int64Type>(uint64())` which would
collapse the signed and unsigned implementations for each bit width.
Also now that I look at it, we should be able to support interval types here
too - e.g. months interval is the same as int32, day-time interval is the same
as int64.
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
(I think this is why something like SumLikeInit is suggested, since you
could work out some templates/helpers to collapse a lot of these cases, but
yes, that would be more complicated.)
##########
File path: cpp/src/arrow/compute/kernels/aggregate_basic.cc
##########
@@ -754,6 +839,30 @@ void RegisterScalarAggregateBasic(FunctionRegistry*
registry) {
aggregate::CountInit, func.get());
DCHECK_OK(registry->AddFunction(std::move(func)));
+ func = std::make_shared<ScalarAggregateFunction>(
+ "count_distinct", Arity::Unary(), &count_distinct_doc,
&default_count_options);
+
+ // Takes any input, outputs int64 scalar
+ aggregate::AddCountDistinctKernel<Int8Type>(int8(), func.get());
+ aggregate::AddCountDistinctKernel<Int16Type>(int16(), func.get());
+ aggregate::AddCountDistinctKernel<Int32Type>(int32(), func.get());
+ aggregate::AddCountDistinctKernel<Date32Type>(date32(), func.get());
+ aggregate::AddCountDistinctKernel<Int64Type>(int64(), func.get());
Review comment:
> > We'd still register the same types, however, we'd be instructing the
compiler to instantiate the kernel less (since e.g. TimestampType and Int64Type
would both delegate to CountDistinctImpl)
>
> I see thanks.
>
> > make sure things like zoned timestamps are properly supported
>
> How does the timezone gets encoded into the timestamp? is it part of its
physical data layout? I think is a good idea to add a test for this case and
for intervals as well ;)
It's a parameter of the type itself and is not in the data. So it's only
relevant during things like type matching. Yes, adding tests would be a good
idea.
>
> > You could also do something like AddCountDistinctKernel(uint64()) which
would collapse the signed and unsigned implementations for each bit width.
>
> Does this will use implicit casting for kernel resolution?
No, so what we're talking about is registering the same kernel
implementation for multiple types to save on generated code. Since for example,
int64 and uint64 are basically the same from the perspective of this function
and you can just treat one as the other.
>
> btw I can use an approach like SumLike no problem (is not that
complicated), my point is that we will have more lines of code/abstractions at
the end and we will make a potential refactor later to support more types.
I think the SumLike approach should handle potential future cases well. Yes,
it does use more abstractions.
>
> > at the very least, it may be worth putting up a follow-up JIRA. (Also
because I think there's other things we may want to do, e.g. we could perhaps
support dictionaries.)
>
> Yes that is part of my point too. I like to have consistency as well, but
I think most of this refactor should take place once we identify we need to add
other types and not before, is better to have a generic code but not to create
internal API when we don't know yet the requirements for each case: i.e. when
in doubt leave it out ;)
Fair enough. Again, the approach here is likely OK, just needs some tweaking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]