pitrou commented on code in PR #37100:
URL: https://github.com/apache/arrow/pull/37100#discussion_r1364086747
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ std::shared_ptr<Scalar> dict_min;
+ std::shared_ptr<Scalar> dict_max;
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 1) {
+ ARROW_ASSIGN_OR_RAISE(dict_min,
compacted_dict_arr.dictionary()->GetScalar(0));
+ dict_max = dict_min;
+ } else {
+ Datum dict_values(compacted_dict_arr.dictionary());
+ ARROW_ASSIGN_OR_RAISE(
+ Datum result, MinMax(std::move(dict_values),
ScalarAggregateOptions::Defaults(),
+ ctx->exec_context()));
+ const StructScalar& struct_result =
+ checked_cast<const StructScalar&>(*result.scalar());
+ ARROW_ASSIGN_OR_RAISE(dict_min, struct_result.field(FieldRef("min")));
+ ARROW_ASSIGN_OR_RAISE(dict_max, struct_result.field(FieldRef("max")));
Review Comment:
This is unnecessarily expensive, isn't it?
```suggestion
dict_min = struct_result.value[MinOrMax::Min];
dict_max = struct_result.value[MinOrMax::Max];
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
Review Comment:
Instead of having to test for null pointers below, isn't it simpler to
instantiate null scalars upfront?
```suggestion
min(MakeNullScalar(this->out_type)),
max(this->min) {
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
Review Comment:
Can you introduce an intermediate variable for clarity?
```suggestion
const int64_t non_null_count = compacted_dict_arr.length() -
compacted_dict_arr.null_count();
if (non_null_count == 0) {
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ std::shared_ptr<Scalar> dict_min;
+ std::shared_ptr<Scalar> dict_max;
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 1) {
+ ARROW_ASSIGN_OR_RAISE(dict_min,
compacted_dict_arr.dictionary()->GetScalar(0));
+ dict_max = dict_min;
+ } else {
+ Datum dict_values(compacted_dict_arr.dictionary());
+ ARROW_ASSIGN_OR_RAISE(
+ Datum result, MinMax(std::move(dict_values),
ScalarAggregateOptions::Defaults(),
+ ctx->exec_context()));
+ const StructScalar& struct_result =
+ checked_cast<const StructScalar&>(*result.scalar());
+ ARROW_ASSIGN_OR_RAISE(dict_min, struct_result.field(FieldRef("min")));
+ ARROW_ASSIGN_OR_RAISE(dict_max, struct_result.field(FieldRef("max")));
+ }
+ ARROW_RETURN_NOT_OK(UpdateMinMaxState(dict_min, dict_max, ctx));
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext* ctx, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+
+ this->has_nulls |= other.has_nulls;
+ this->count += other.count;
+ ARROW_RETURN_NOT_OK(UpdateMinMaxState(other.min, other.max, ctx));
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ const auto& struct_type = checked_cast<const StructType&>(*out_type);
+ const auto& child_type = struct_type.field(0)->type();
+
+ std::vector<std::shared_ptr<Scalar>> values;
+ if ((this->has_nulls && !options.skip_nulls) || (this->count <
options.min_count)) {
+ // (null, null)
+ std::shared_ptr<Scalar> null_scalar = MakeNullScalar(child_type);
+ values = {null_scalar, null_scalar};
+ } else {
+ values = {std::move(this->min), std::move(this->max)};
+ }
+
+ out->value = std::make_shared<StructScalar>(std::move(values),
this->out_type);
+ return Status::OK();
+ }
+
+ ScalarAggregateOptions options;
+ std::shared_ptr<DataType> out_type;
+ bool has_nulls;
+ int64_t count;
+ std::shared_ptr<Scalar> min;
+ std::shared_ptr<Scalar> max;
+
+ private:
+ Status UpdateMinMaxState(const std::shared_ptr<Scalar>& other_min,
+ const std::shared_ptr<Scalar>& other_max,
KernelContext* ctx) {
+ if (this->min == nullptr || this->min->type->id() == Type::NA) {
Review Comment:
Why check for `NA`?
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
Review Comment:
Shouldn't we shortcut here?
```suggestion
if (this->has_nulls && !options.skip_nulls) {
return Status::OK();
}
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ std::shared_ptr<Scalar> dict_min;
+ std::shared_ptr<Scalar> dict_max;
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 1) {
+ ARROW_ASSIGN_OR_RAISE(dict_min,
compacted_dict_arr.dictionary()->GetScalar(0));
Review Comment:
Why would the only non-null item be at index 0? This doesn't look correct.
The dictionary values could be `[null, "foo"]` for example (for a string
dictionary).
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ std::shared_ptr<Scalar> dict_min;
+ std::shared_ptr<Scalar> dict_max;
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 1) {
+ ARROW_ASSIGN_OR_RAISE(dict_min,
compacted_dict_arr.dictionary()->GetScalar(0));
+ dict_max = dict_min;
+ } else {
+ Datum dict_values(compacted_dict_arr.dictionary());
+ ARROW_ASSIGN_OR_RAISE(
+ Datum result, MinMax(std::move(dict_values),
ScalarAggregateOptions::Defaults(),
+ ctx->exec_context()));
+ const StructScalar& struct_result =
+ checked_cast<const StructScalar&>(*result.scalar());
+ ARROW_ASSIGN_OR_RAISE(dict_min, struct_result.field(FieldRef("min")));
+ ARROW_ASSIGN_OR_RAISE(dict_max, struct_result.field(FieldRef("max")));
+ }
+ ARROW_RETURN_NOT_OK(UpdateMinMaxState(dict_min, dict_max, ctx));
+ return Status::OK();
+ }
+
+ Status MergeFrom(KernelContext* ctx, KernelState&& src) override {
+ const auto& other = checked_cast<const ThisType&>(src);
+
+ this->has_nulls |= other.has_nulls;
+ this->count += other.count;
+ ARROW_RETURN_NOT_OK(UpdateMinMaxState(other.min, other.max, ctx));
+ return Status::OK();
+ }
+
+ Status Finalize(KernelContext*, Datum* out) override {
+ const auto& struct_type = checked_cast<const StructType&>(*out_type);
+ const auto& child_type = struct_type.field(0)->type();
+
+ std::vector<std::shared_ptr<Scalar>> values;
+ if ((this->has_nulls && !options.skip_nulls) || (this->count <
options.min_count)) {
+ // (null, null)
+ std::shared_ptr<Scalar> null_scalar = MakeNullScalar(child_type);
+ values = {null_scalar, null_scalar};
+ } else {
+ values = {std::move(this->min), std::move(this->max)};
+ }
+
+ out->value = std::make_shared<StructScalar>(std::move(values),
this->out_type);
+ return Status::OK();
+ }
+
+ ScalarAggregateOptions options;
+ std::shared_ptr<DataType> out_type;
+ bool has_nulls;
+ int64_t count;
+ std::shared_ptr<Scalar> min;
+ std::shared_ptr<Scalar> max;
+
+ private:
+ Status UpdateMinMaxState(const std::shared_ptr<Scalar>& other_min,
+ const std::shared_ptr<Scalar>& other_max,
KernelContext* ctx) {
+ if (this->min == nullptr || this->min->type->id() == Type::NA) {
+ this->min = other_min;
+ } else if (other_min != nullptr && other_min->type->id() != Type::NA) {
+ ARROW_ASSIGN_OR_RAISE(
+ Datum greater_result,
+ CallFunction("greater", {this->min, other_min},
ctx->exec_context()));
+ const BooleanScalar& greater_scalar =
+ checked_cast<const BooleanScalar&>(*greater_result.scalar());
+
+ if (greater_scalar.value) {
+ this->min = other_min;
+ }
+ }
+
+ if (this->max == nullptr || this->max->type->id() == Type::NA) {
+ this->max = other_max;
+ } else if (other_max != nullptr && other_max->type->id() != Type::NA) {
+ ARROW_ASSIGN_OR_RAISE(
+ Datum less_result,
+ CallFunction("less", {this->max, other_max}, ctx->exec_context()));
+ const BooleanScalar& less_scalar =
+ checked_cast<const BooleanScalar&>(*less_result.scalar());
+
+ if (less_scalar.value) {
+ this->max = other_max;
+ }
+ }
Review Comment:
This seems quite complicated and expensive.
I may be missing something, but I think it would much simpler to create a
child `MinMaxInitState` instance for the dictionary's value type, and then call
that instance in `DictionaryMinMaxImpl::Consume` and
`DictionaryMinMaxImpl::MergeFrom`. What do you think?
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
Review Comment:
I don't think this is correct. `compacted_dict_arr.null_count()` counts the
number of nulls among the dict indices, but ignores nulls among dict values.
We have a utility function `CountAndSetBits` which might help here.
##########
cpp/src/arrow/compute/kernels/aggregate_test.cc:
##########
@@ -2175,6 +2175,244 @@ TEST(TestFixedSizeBinaryMinMaxKernel, Basics) {
EXPECT_THAT(MinMax(ScalarFromJSON(ty, R"("aa")"), options),
ResultWith(null));
}
+TEST(TestDictionaryMinMaxKernel, DecimalsValue) {
+ ScalarAggregateOptions options;
+ std::shared_ptr<arrow::DataType> dict_ty;
+ std::shared_ptr<arrow::DataType> ty;
+ for (const auto& index_type : all_dictionary_index_types()) {
+ ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
+
+ for (const auto& value_ty_ : {decimal128(5, 2), decimal256(5, 2)}) {
+ dict_ty = dictionary(index_type, value_ty_);
+ ty = struct_({field("min", value_ty_), field("max", value_ty_)});
+
+ auto chunk1 = DictArrayFromJSON(dict_ty, R"([null, 0])", R"(["5.10"])");
+ auto chunk2 = DictArrayFromJSON(dict_ty, R"([0, 1, 1])", R"(["3.10",
"-1.23"])");
+ ASSERT_OK_AND_ASSIGN(auto chunked, ChunkedArray::Make({chunk1, chunk2}));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/true);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([0, 1, 1, 0])", R"(["5.10",
"-1.23"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([3, 1, 1, 4, 0, 2, null])",
+ R"(["5.10", "-1.23", "2.00", "3.45",
"4.56"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([null, 3, null, 1, 4, 3, 0, 2,
null])",
+ R"(["-5.10", "-1.23", "-2.00", "-3.45",
"-4.56"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-5.10", "max":
"-1.23"})")));
+ EXPECT_THAT(MinMax(DictArrayFromJSON(dict_ty, R"([null, null])",
R"([])"), options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max":
null})")));
+ EXPECT_THAT(MinMax(DictArrayFromJSON(dict_ty, R"([])", R"([])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max":
null})")));
+ EXPECT_THAT(MinMax(DictArrayFromJSON(dict_ty, R"([0])", R"(["1.00"])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "1.00", "max":
"1.00"})")));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/false);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max":
null})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([null, 3, 1, 1, 4, 0, 2,
null])",
+ R"(["5.10", "-1.23", "2.00", "3.45",
"4.56"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max": null})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([3, 1, 1, 4, 0, 2])",
+ R"(["5.10", "-1.23", "2.00", "3.45",
"4.56"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/0);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([null, null, null])", R"([])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max": null})")));
+ EXPECT_THAT(MinMax(DictArrayFromJSON(dict_ty, R"([0])", R"(["1.00"])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "1.00", "max":
"1.00"})")));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/5);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max":
null})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([0, 1, 1, 0])", R"(["5.10",
"-1.23"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max": null})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([null, 3, 1, 1, 4, 0, 2, null,
null])",
+ R"(["5.10", "-1.23", "2.00", "3.45",
"4.56"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/true, /*min_count=*/1);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([null, null, null])", R"([])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": null, "max": null})")));
+ EXPECT_THAT(MinMax(DictArrayFromJSON(dict_ty, R"([0])", R"(["1.00"])"),
options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "1.00", "max":
"1.00"})")));
+
+ // compact dictionary
+ EXPECT_THAT(
+ MinMax(
+ DictArrayFromJSON(
+ dict_ty, R"([3, 1, 1, 4, 0, 2])",
+ R"(["5.10", "-1.23", "2.00", "3.45", "4.56", "8.20", "9.20",
"10.20"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(
+ DictArrayFromJSON(
+ dict_ty, R"([5, 1, 1, 6, 0, 2])",
+ R"(["5.10", "-1.23", "2.00", "3.45", "4.56", "8.20", "9.20",
"10.20"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"9.20"})")));
+ }
Review Comment:
It would be nice to streamline these tests but also make them more
comprehensive. I would expect:
- tests with nulls in indices
- tests with nulls in values
- tests with nulls in both indices and values
- tests with unreferenced values and multiply referenced values
- variations on `min_count` and `skip_nulls` options
Don't hesitate to add comments to clearly describe which is which.
##########
cpp/src/arrow/compute/kernels/aggregate_test.cc:
##########
@@ -2175,6 +2175,244 @@ TEST(TestFixedSizeBinaryMinMaxKernel, Basics) {
EXPECT_THAT(MinMax(ScalarFromJSON(ty, R"("aa")"), options),
ResultWith(null));
}
+TEST(TestDictionaryMinMaxKernel, DecimalsValue) {
+ ScalarAggregateOptions options;
+ std::shared_ptr<arrow::DataType> dict_ty;
+ std::shared_ptr<arrow::DataType> ty;
+ for (const auto& index_type : all_dictionary_index_types()) {
+ ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
+
+ for (const auto& value_ty_ : {decimal128(5, 2), decimal256(5, 2)}) {
+ dict_ty = dictionary(index_type, value_ty_);
+ ty = struct_({field("min", value_ty_), field("max", value_ty_)});
+
+ auto chunk1 = DictArrayFromJSON(dict_ty, R"([null, 0])", R"(["5.10"])");
+ auto chunk2 = DictArrayFromJSON(dict_ty, R"([0, 1, 1])", R"(["3.10",
"-1.23"])");
+ ASSERT_OK_AND_ASSIGN(auto chunked, ChunkedArray::Make({chunk1, chunk2}));
+
+ options = ScalarAggregateOptions(/*skip_nulls=*/true);
+ EXPECT_THAT(MinMax(chunked, options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
+ EXPECT_THAT(
+ MinMax(DictArrayFromJSON(dict_ty, R"([0, 1, 1, 0])", R"(["5.10",
"-1.23"])"),
+ options),
+ ResultWith(ScalarFromJSON(ty, R"({"min": "-1.23", "max":
"5.10"})")));
Review Comment:
Can you extract a helper function to automate all the scaffolding here?
Basically it would be more readable as:
```cpp
CheckDictionaryMinMax(
index_type, value_type,
R"([0, 1, 1, 0])", R"(["5.10", "-1.23"])",
/*expected_min=*/ "-1.23", /*expected_max=*/= "5.10");
```
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
Review Comment:
Actually, it could be useful to expose that as a
`DictionaryArray::CountNullValues` method.
##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,122 @@ struct NullMinMaxImpl : public ScalarAggregator {
}
};
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+ using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+ DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type,
ScalarAggregateOptions options)
+ : options(std::move(options)),
+ out_type(std::move(out_type)),
+ has_nulls(false),
+ count(0),
+ min(nullptr),
+ max(nullptr) {
+ this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+ }
+
+ Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+ if (batch[0].is_scalar()) {
+ return Status::NotImplemented("No min/max implemented for
DictionaryScalar");
+ }
+
+ DictionaryArray dict_arr(batch[0].array.ToArrayData());
+ ARROW_ASSIGN_OR_RAISE(auto compacted_arr,
dict_arr.Compact(ctx->memory_pool()));
+ const DictionaryArray& compacted_dict_arr =
+ checked_cast<const DictionaryArray&>(*compacted_arr);
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 0) {
+ return Status::OK();
+ }
+ this->has_nulls |= compacted_dict_arr.null_count() > 0;
+ this->count += compacted_dict_arr.length() -
compacted_dict_arr.null_count();
+
+ std::shared_ptr<Scalar> dict_min;
+ std::shared_ptr<Scalar> dict_max;
+ if (compacted_dict_arr.length() - compacted_dict_arr.null_count() == 1) {
+ ARROW_ASSIGN_OR_RAISE(dict_min,
compacted_dict_arr.dictionary()->GetScalar(0));
Review Comment:
I think this is premature optimization in any case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]