NinaPeng commented on issue #36878:
URL: https://github.com/apache/arrow/issues/36878#issuecomment-1650908967
I have tried some code for maintaining aggregate kernel intermediate state,
here is `GroupedReducingAggregator` enhancement code example:
```cpp
Result<std::shared_ptr<Schema>> IntermediateSchema() override {
return arrow::schema({arrow::field("reduced({0})", out_type_),
arrow::field("counts({0})", arrow::int64())});
}
Status ConsumeIntermediate(const arrow::compute::ExecSpan& batch) override
{
// needs resize before consume
CType* reduced = reduced_.mutable_data();
int64_t* counts = counts_.mutable_data();
uint8_t* no_nulls = no_nulls_.mutable_data();
ExecSpan reduced_batch = batch;
if (batch.values.size() == 3) {
// reduced and counts intermediate values
// counts can be ignored for GroupedSumImpl
reduced_batch = ExecSpan({batch[0], batch[2]}, batch.length);
}
VisitGroupedValues<Type>(
reduced_batch,
[&](uint32_t g, InputCType value) {
reduced[g] = Impl::Reduce(*out_type_, reduced[g], value);
},
[&](uint32_t g) { bit_util::SetBitTo(no_nulls, g, false); });
if (batch.values.size() == 3) {
ExecSpan counts_batch({batch[1], batch[2]}, batch.length);
VisitGroupedValues<Int64Type>(
counts_batch, [&](uint32_t g, int64_t value) { counts[g] += value;
},
[&](uint32_t g) { bit_util::SetBitTo(no_nulls, g, false); });
}
return Status::OK();
}
Result<std::vector<Datum>> FinalizeIntermediate() override {
std::shared_ptr<Buffer> null_bitmap = nullptr;
int64_t null_count = 0;
if (!options_.skip_nulls) {
null_count = kUnknownNullCount;
if (null_bitmap) {
arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
no_nulls_.data(), /*right_offset=*/0,
num_groups_,
/*out_offset=*/0,
null_bitmap->mutable_data());
} else {
ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish());
}
}
ARROW_ASSIGN_OR_RAISE(auto reduced_values, reduced_.Finish());
ARROW_ASSIGN_OR_RAISE(auto counts_values, counts_.Finish());
auto reduced_array =
ArrayData::Make(out_type(), num_groups_,
{std::move(null_bitmap), std::move(reduced_values)},
null_count);
auto counts_array = ArrayData::Make(arrow::int64(), num_groups_,
{nullptr, std::move(counts_values)},
0);
return std::vector<Datum>{reduced_array, counts_array};
}
```
In this case, when make `GroupByNode` , pass an argument as `enum
AggregateMode {PRE, POST, ALL}` to indicate if keep or consume aggregation
intermediate mode.
when aggregate mode is `AggregateMode::PRE`:
- `GroupByNode` output schema is combined with `IntermediateSchema` method
and aggregate target set, for example `fmt::format("reduced({0})",
agg.target[0]), fmt::format("counts({0})", agg.target[0])`
- use `FinalizeIntermediate` instead of `Finalize` in `GroupByNode` process
when aggregate mode is `AggregateMode::POST`:
- use kernel `IntermediateSchema` and aggregate target to get input schema
(maybe `IntermediateSchema` fields name should be maintained without datatype
when to get input schema)
- use `ConsumeIntermediate` instead of `Consume` in `GroupByNode` process
If it is acceptable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]