pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695764540
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -206,6 +206,66 @@ Result<Datum> GroupByUsingExecPlan(const
std::vector<Datum>& arguments,
plan->sources()[0]->outputs()[0]->output_schema()->fields());
}
+Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input,
+ const std::vector<std::string>& key_names,
+ const std::vector<std::string>& arg_names,
+ const std::vector<internal::Aggregate>&
aggregates,
+ bool use_threads, ExecContext* ctx) {
+ std::vector<FieldRef> keys(key_names.size());
+ std::vector<FieldRef> targets(aggregates.size());
+ std::vector<std::string> names(aggregates.size());
+ for (size_t i = 0; i < aggregates.size(); ++i) {
+ names[i] = aggregates[i].function;
+ targets[i] = FieldRef(arg_names[i]);
+ }
+ for (size_t i = 0; i < key_names.size(); ++i) {
+ keys[i] = FieldRef(key_names[i]);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(ctx));
Review comment:
It looks like most of this could be factored out to reuse in both
`GroupByUsingExecPlan` overloads.
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1547,13 +1641,21 @@ struct GroupedTDigestImpl : public GroupedAggregator {
Status Consume(const ExecBatch& batch) override {
auto g = batch[1].array()->GetValues<uint32_t>(1);
- VisitArrayDataInline<Type>(
- *batch[0].array(),
- [&](typename TypeTraits<Type>::CType value) {
- this->tdigests_[*g].NanAdd(value);
- ++g;
- },
- [&] { ++g; });
+ if (batch[0].is_array()) {
+ VisitArrayDataInline<Type>(
+ *batch[0].array(),
+ [&](typename TypeTraits<Type>::CType value) {
+ this->tdigests_[*g].NanAdd(value);
+ ++g;
+ },
+ [&] { ++g; });
+ } else if (batch[0].scalar()->is_valid) {
+ typename TypeTraits<Type>::CType value =
+ UnboxScalar<Type>::Unbox(*batch[0].scalar());
+ for (int64_t i = 0; i < batch.length; i++) {
+ this->tdigests_[*g++].NanAdd(value);
+ }
+ }
Review comment:
May be nicer if written as:
```c++
auto consume_value = [&](uint32_t g, CType val) {
this->tdigests_[g].NanAdd(val);
};
VisitGroupedNonNullValues<Type>(batch, std::move(consume_value));
```
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1833,30 +1951,41 @@ struct GroupedBooleanAggregator : public
GroupedAggregator {
uint8_t* seen = seen_.mutable_data();
uint8_t* no_nulls = no_nulls_.mutable_data();
int64_t* counts = counts_.mutable_data();
- const auto& input = *batch[0].array();
auto g = batch[1].array()->GetValues<uint32_t>(1);
- if (input.MayHaveNulls()) {
- const uint8_t* bitmap = input.buffers[1]->data();
- arrow::internal::VisitBitBlocksVoid(
- input.buffers[0], input.offset, input.length,
- [&](int64_t position) {
- counts[*g]++;
- Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
- g++;
- },
- [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+ if (batch[0].is_array()) {
+ const auto& input = *batch[0].array();
+ if (input.MayHaveNulls()) {
+ const uint8_t* bitmap = input.buffers[1]->data();
+ arrow::internal::VisitBitBlocksVoid(
+ input.buffers[0], input.offset, input.length,
+ [&](int64_t position) {
+ counts[*g]++;
+ Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap,
position));
+ g++;
+ },
+ [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+ } else {
+ arrow::internal::VisitBitBlocksVoid(
+ input.buffers[1], input.offset, input.length,
+ [&](int64_t) {
+ Impl::UpdateGroupWith(seen, *g, true);
+ counts[*g++]++;
+ },
+ [&]() {
+ Impl::UpdateGroupWith(seen, *g, false);
+ counts[*g++]++;
+ });
+ }
} else {
- arrow::internal::VisitBitBlocksVoid(
- input.buffers[1], input.offset, input.length,
- [&](int64_t) {
- Impl::UpdateGroupWith(seen, *g, true);
- counts[*g++]++;
- },
- [&]() {
- Impl::UpdateGroupWith(seen, *g, false);
- counts[*g++]++;
- });
+ const auto& input = *batch[0].scalar();
+ if (input.is_valid) {
+ const bool value = UnboxScalar<BooleanType>::Unbox(input);
+ for (int64_t i = 0; i < batch.length; i++) {
+ Impl::UpdateGroupWith(seen, *g, value);
+ counts[*g++]++;
+ }
+ }
Review comment:
Shouldn't the non-valid case be handled as well? e.g. a loop doing
`BitUtil::SetBitTo(no_nulls, *g++, false)`.
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1700,14 +1802,30 @@ struct GroupedMinMaxImpl : public GroupedAggregator {
auto raw_mins = reinterpret_cast<CType*>(mins_.mutable_data());
auto raw_maxes = reinterpret_cast<CType*>(maxes_.mutable_data());
- VisitArrayValuesInline<Type>(
- *batch[0].array(),
- [&](CType val) {
+ if (batch[0].is_array()) {
+ VisitArrayValuesInline<Type>(
+ *batch[0].array(),
+ [&](CType val) {
+ raw_maxes[*g] = std::max(raw_maxes[*g], val);
+ raw_mins[*g] = std::min(raw_mins[*g], val);
+ BitUtil::SetBit(has_values_.mutable_data(), *g++);
+ },
+ [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+ } else {
+ const auto& input = *batch[0].scalar();
+ if (input.is_valid) {
+ const auto val = UnboxScalar<Type>::Unbox(input);
+ for (int64_t i = 0; i < batch.length; i++) {
raw_maxes[*g] = std::max(raw_maxes[*g], val);
raw_mins[*g] = std::min(raw_mins[*g], val);
BitUtil::SetBit(has_values_.mutable_data(), *g++);
- },
- [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+ }
+ } else {
+ for (int64_t i = 0; i < batch.length; i++) {
+ BitUtil::SetBit(has_nulls_.mutable_data(), *g++);
+ }
+ }
+ }
Review comment:
So it seems it would be nicer to write this as:
```c++
auto consume_value = [&](uint32_t g, CType val) {
raw_maxes[g] = std::max(raw_maxes[g], val);
raw_mins[g] = std::min(raw_mins[g], val);
BitUtil::SetBit(has_values_.mutable_data(), g);
};
auto consume_null = [&](uint32_t g) {
BitUtil::SetBit(has_nulls_.mutable_data(), g);
};
VisitGroupedValues<Type>(batch, std::move(consume_value),
std::move(consume_nulls));
```
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1211,6 +1249,22 @@ struct GroupedMeanImpl : public
GroupedReducingAggregator<Type, GroupedMeanImpl<
return Status::OK();
}
+ static Status Consume(const Scalar& value, const int64_t count, c_type*
reduced,
+ int64_t* counts, uint8_t* no_nulls, const uint32_t* g)
{
+ if (value.is_valid) {
+ const auto v =
to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+ for (int i = 0; i < count; i++) {
+ reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) + v);
Review comment:
Here as well, it would be nice not to have the
`static_cast`/`to_unsigned` dance pasted everywhere.
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1138,6 +1160,22 @@ struct GroupedProductImpl final
return Status::OK();
}
+ static Status Consume(const Scalar& value, const int64_t count, c_type*
reduced,
+ int64_t* counts, uint8_t* no_nulls, const uint32_t* g)
{
+ if (value.is_valid) {
+ const auto v =
to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+ for (int i = 0; i < count; i++) {
+ reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) * v);
Review comment:
Why doesn't this use `MultiplyTraits<AccType>::Multiply` instead? It
seems like copy/pasting is putting us at risk of latent bugs here. Perhaps you
can factor out the raw computation routines?
```c++
using InputCType = typename TypeTraits<Type>::CType;
AccType Reduce(const DataType& type, c_type u, InputCType v) {
return MultiplyTraits<AccType>::Multiply(type, u, static_cast<c_type>(v));
}
AccType Reduce(const DataType& type, c_type u, c_type v) {
return MultiplyTraits<AccType>::Multiply(type, u, v);
}
```
Same for summing:
```c++
using InputCType = typename TypeTraits<Type>::CType;
AccType Reduce(const DataType& type, c_type u, InputCType v) {
return static_cast<c_type>(to_unsigned(u) +
to_unsigned(static_cast<c_type>(v)));
}
AccType Reduce(const DataType& type, c_type u, c_type v) {
return static_cast<c_type>(to_unsigned(u) + to_unsigned(v));
}
```
Then you may even reconcile the Sum and Product implementations further.
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1032,6 +1169,55 @@ TEST(GroupBy, TDigest) {
/*verbose=*/true);
}
+TEST(GroupBy, StddevVarianceTDigestScalar) {
+ BatchesWithSchema input;
+ input.batches = {
+ ExecBatchFromJSON(
+ {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()),
int64()},
+ "[[1, 1.0, 1], [1, 1.0, 1], [1, 1.0, 2], [1, 1.0, 3]]"),
+ ExecBatchFromJSON(
+ {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()),
int64()},
+ "[[null, null, 1], [null, null, 1], [null, null, 2], [null, null,
3]]"),
+ ExecBatchFromJSON({int32(), float32(), int64()},
+ "[[2, 2.0, 1], [3, 3.0, 2], [4, 4.0, 3]]"),
+ };
+ input.schema = schema(
+ {field("argument", int32()), field("argument1", float32()), field("key",
int64())});
+
+ for (bool use_threads : {false}) {
+ SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+ ASSERT_OK_AND_ASSIGN(Datum actual,
+ GroupByUsingExecPlan(input, {"key"},
+ {"argument", "argument",
"argument",
+ "argument1", "argument1",
"argument1"},
+ {
+ {"hash_stddev", nullptr},
+ {"hash_variance", nullptr},
+ {"hash_tdigest", nullptr},
+ {"hash_stddev", nullptr},
+ {"hash_variance", nullptr},
+ {"hash_tdigest", nullptr},
+ },
+ use_threads,
default_exec_context()));
+ Datum expected =
+ ArrayFromJSON(struct_({
+ field("hash_stddev", float64()),
+ field("hash_variance", float64()),
+ field("hash_tdigest", fixed_size_list(float64(), 1)),
+ field("hash_stddev", float64()),
+ field("hash_variance", float64()),
+ field("hash_tdigest", fixed_size_list(float64(), 1)),
+ field("key", int64()),
+ }),
+ R"([
+ [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
+ [1.0, 1.0, [1.0], 1.0, 1.0, [1.0], 2],
+ [1.5, 2.25, [1.0], 1.5, 2.25, [1.0], 3]
Review comment:
tdigest is supposed to compute the median? I would expect 1.0, 2.0 and
2.5 respectively.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]