msirek opened a new pull request, #8038:
URL: https://github.com/apache/arrow-datafusion/pull/8038
## Which issue does this PR close?
Closes #7781.
## Rationale for this change
Evaluation of queries form
SELECT DISTICT _column_list_ FROM _table_ LIMIT _n_;
may read more rows than necessary when performing a grouped hash aggregation.
If a given batch of input rows is seen which contains more group values than
the LIMIT value, switching the aggregation to output mode early allows the
the limit to be reached more quickly and minimizes the number of rows which
need to be processed by the aggregation, or the input stream.
## What changes are included in this PR?
#### Push limit into AggregateExec for DISTINCT with GROUP BY
This commit adds physical plan rewrite rule `LimitedDistinctAggregation`,
but does not wire it up for use by the optimizer. The rule matches a
`LocalLimitExec` or `GlobalLimitExec` operation as the parent of an
`AggregateExec` which has a group-by, but no aggregate expressions, order-by
or ordering requirements, or filtering, and pushes the limit into the
the `AggregateExec` as a limit hint.
As the aggregation may be applied in a series of `AggregateExec`
operations, the limit is also pushed down a chain of direct
`AggregateExec` decendents having identical grouping columns.
The rule must be applied before distribution requirements are enforced
as that rule may inject other operations in between the different
`AggregateExec`s. Applying the rule early means only directly-connected
`AggregateExec`s need to be examined.
The key point of this rule is that it is only legal for cases where not
all rows in the group need to be processed to ensure correctness.
Unit tests for LimitedDistinctAggregation are included.
#### Soft limit for GroupedHashAggregateStream with no aggregate expressions
This commit wires up the LimitedDistinctAggregation rule in the physical
plan optimizer and updates the GroupedHashAggregateStream with an
optional soft limit on the number of `group_values` in a batch. If the
number of `group_values` in a single batch exceeds the limit, the operation
immediately signals the input is done, switches to output mode and emits all
groups.
This commit includes sqllogictests for DISTINCT queries with a LIMIT.
The CombinePartialFinalAggregate rule is also updated to convey the
limit on the final aggregation to the combined aggregation.
#### Add datafusion.optimizer.enable_distinct_aggregation_soft_limit setting
This commit adds the
datafusion.optimizer.enable_distinct_aggregation_soft_limit
configuration setting, which defaults to true. When true, the
LimitedDistinctAggregation physical plan rewrite rule is enabled, which
pushes a LIMIT into a grouped aggregation with no aggregate expressions,
as a soft limit, to emit all grouped values seen so far once the limit is
reached.
#### Fix result checking in topk_aggregate benchmark
This commit fixes the logic which validates the rows returned by the
benchmark query.
The test was expecting hexadecimal digits in lowercase, but results are
uppercase.
#### Make the topk_aggregate benchmark's make_data function public
This commit moves the make_data function, which generates either random or
ascending
time series data, to the data_utils module, so it could be shared by other
benchmarks.
#### Add benchmark for DISTINCT queries
This commit adds a benchmark for queries using DISTINCT or GROUP BY with a
LIMIT clause
and no aggregate expressions. It is intended to test the performance of the
`LimitedDistinctAggregation` rewrite rule and new limit hint in
`GroupedHashAggregateStream`.
## Are these changes tested?
- [x] unit tests for LimitedDistinctAggregation
- [x] unit test for propagating the AggregateExec limit to the result of
`CombinePartialFinalAggregate`
- [x] unit test for
datafusion.optimizer.enable_distinct_aggregation_soft_limit setting
- [x] sqllogictests to check correct results of AggregateExec with a limit
- [x] criterion benchmarks for DISTINCT queries
## Are there any user-facing changes?
No
## Notes
This is opened as a draft PR.
PRs for the individual commits can be opened separately if this is too large
to review in one PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]