js8544 commented on code in PR #37100:
URL: https://github.com/apache/arrow/pull/37100#discussion_r1357849884
##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -492,11 +492,32 @@ Result<std::unique_ptr<KernelState>>
MinMaxInit(KernelContext* ctx,
return visitor.Create();
}
+struct AnyExceptDictionaryMatcher : TypeMatcher {
Review Comment:
I think there's a more general approach for matching with non-dictionary
types. We can add a `NotMatcher` in kernel.cc with constructor
```explicit NotMatcher(std::shared_ptr<TypeMatcher> base_matcher)```
and returns `!base_matcher.Matches(type)`. And when registering kernels we
will use `Not(SameTypeId(Type::DICTIONARY))`. It will be useful if some other
kernels face the same issue.
##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -516,7 +537,10 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
// Note SIMD level is always NONE, but the convenience kernel will
// dispatch to an appropriate implementation
- AddAggKernel(std::move(sig), std::move(init), std::move(finalize), func);
+ AddAggKernel(sig, init, finalize, func);
+
+ sig = KernelSignature::Make({InputType(Type::DICTIONARY)},
DictionaryValueType);
Review Comment:
Since we use `min_max_func->DispatchExact` in `init`, do we need to separate
dict and non dict types here?
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ this->has_nulls = compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+ if (dict->length() == 0) {
Review Comment:
Should this check happen before updating `has_nulls` and `count`?
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ this->has_nulls = compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+ if (dict->length() == 0) {
+ return Status::OK();
+ }
+
+ Datum dict_values(std::move(dict));
+ ARROW_ASSIGN_OR_RAISE(
+ Datum result, MinMax(std::move(dict_values),
ScalarAggregateOptions::Defaults(),
+ ctx->exec_context()));
+ const StructScalar& struct_result =
+ checked_cast<const StructScalar&>(*result.scalar());
+ ARROW_ASSIGN_OR_RAISE(auto dict_min, struct_result.field(FieldRef("min")));
+ ARROW_ASSIGN_OR_RAISE(auto dict_max, struct_result.field(FieldRef("max")));
+ ARROW_RETURN_NOT_OK(CompareMinMax(std::move(dict_min),
std::move(dict_max), ctx));
Review Comment:
nit: I would call this function UpdateMinMaxState to reflect that it changes
the state. CompareMinMax sounds like a pure function.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ this->has_nulls = compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+ if (dict->length() == 0) {
+ return Status::OK();
+ }
+
+ Datum dict_values(std::move(dict));
Review Comment:
Moving a const reference will have no effect
##########
cpp/src/arrow/compute/kernels/aggregate_test.cc:
##########
@@ -2047,6 +2047,117 @@ TEST(TestDecimalMinMaxKernel, Decimals) {
}
}
+TEST(TestDictionaryMinMaxKernel, DictionaryArray) {
+ ScalarAggregateOptions options;
+ std::shared_ptr<arrow::DataType> dict_ty;
+ std::shared_ptr<arrow::DataType> ty;
+ for (const auto& index_type : all_dictionary_index_types()) {
+ ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
+
+ for (const auto& item_ty_ : {decimal128(5, 2), decimal256(5, 2)}) {
Review Comment:
We need test cases for integer, float and maybe temporal types too. Many of
the test cases here can be reused though.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ this->has_nulls = compacted_dict_arr.null_count() > 0;
Review Comment:
```suggestion
this->has_nulls |= compacted_dict_arr.null_count() > 0;
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -1002,6 +1115,11 @@ struct MinMaxInitState {
return Status::OK();
}
+ Status Visit(const DictionaryType&) {
+ state.reset(new DictionaryMinMaxImpl<SimdLevel>(out_type, options));
Review Comment:
In DictionaryMinMaxImpl there are many function calls on `input.value_type`
such as 'MinMax' in `Consume` and 'greater'&'less' in `CompareMinMax`. Each
function call goes through a dispatching process and some other function call
overheads. Since the value type is fixed, we can get its corresponding kernels
using `Function::DispatchExact` here and directly invoke the kernels in
DictionaryMinMaxImpl.
This is also helpful when user inputs an unsupported type such as
`dict(list(int32))`, we can fail early and return a better error message if we
try dispatching here. Currently it will first compact the dictionary before
failing and the error message is something like `MinMax has no kernel matching
list(int32)` instead of showing the real input type `dict(list(int32))`.
But this can be left as an optimization for the future. You can create an
issue and leave this PR as it is.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ this->has_nulls = compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+ if (dict->length() == 0) {
+ return Status::OK();
+ }
+
+ Datum dict_values(std::move(dict));
+ ARROW_ASSIGN_OR_RAISE(
+ Datum result, MinMax(std::move(dict_values),
ScalarAggregateOptions::Defaults(),
+ ctx->exec_context()));
+ const StructScalar& struct_result =
+ checked_cast<const StructScalar&>(*result.scalar());
+ ARROW_ASSIGN_OR_RAISE(auto dict_min, struct_result.field(FieldRef("min")));
+ ARROW_ASSIGN_OR_RAISE(auto dict_max, struct_result.field(FieldRef("max")));
+ ARROW_RETURN_NOT_OK(CompareMinMax(std::move(dict_min),
std::move(dict_max), ctx));
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext* ctx, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+
+ ARROW_RETURN_NOT_OK(CompareMinMax(other.min, other.max, ctx));
+ this->has_nulls = this->has_nulls || other.has_nulls;
+ this->count += other.count;
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ const auto& struct_type = checked_cast<const StructType&>(*out_type);
+ const auto& child_type = struct_type.field(0)->type();
+
+ std::vector<std::shared_ptr<Scalar>> values;
+ // Physical type != result type
Review Comment:
Do we need this check here? I think the primitive kernels guarantee that
this->min/max is of the same type as child_type.
##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -516,7 +537,10 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
// Note SIMD level is always NONE, but the convenience kernel will
// dispatch to an appropriate implementation
- AddAggKernel(std::move(sig), std::move(init), std::move(finalize), func);
+ AddAggKernel(sig, init, finalize, func);
+
+ sig = KernelSignature::Make({InputType(Type::DICTIONARY)},
DictionaryValueType);
Review Comment:
And therefore we probably don't need the `AnyExceptDictionaryMatcher` at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]