msirek opened a new pull request, #8038:
URL: https://github.com/apache/arrow-datafusion/pull/8038

   ## Which issue does this PR close?
   
   Closes #7781.
   
   ## Rationale for this change
   
   Evaluation of queries form
     SELECT DISTICT _column_list_ FROM _table_ LIMIT _n_; 
   may read more rows than necessary when performing a grouped hash aggregation.
   
   If a given batch of input rows is seen which contains more group values than
   the LIMIT value, switching the aggregation to output mode early allows the
   the limit to be reached more quickly and minimizes the number of rows which
   need to be processed by the aggregation, or the input stream.
   
   ## What changes are included in this PR?
   
   #### Push limit into AggregateExec for DISTINCT with GROUP BY
   
   This commit adds physical plan rewrite rule `LimitedDistinctAggregation`,
   but does not wire it up for use by the optimizer. The rule matches a
   `LocalLimitExec` or `GlobalLimitExec` operation as the parent of an 
   `AggregateExec` which has a group-by, but no aggregate expressions, order-by 
   or ordering requirements, or filtering, and pushes the limit into the        
             
   the `AggregateExec` as a limit hint.     
   
   As the aggregation may be applied in a series of `AggregateExec` 
   operations, the limit is also pushed down a chain of direct    
   `AggregateExec` decendents having identical grouping columns.
   
   The rule must be applied before distribution requirements are enforced
   as that rule may inject other operations in between the different 
   `AggregateExec`s. Applying the rule early means only directly-connected  
   `AggregateExec`s need to be examined.
   
   The key point of this rule is that it is only legal for cases where not
   all rows in the group need to be processed to ensure correctness.
   
   Unit tests for LimitedDistinctAggregation are included.
   
   #### Soft limit for GroupedHashAggregateStream with no aggregate expressions
   
   This commit wires up the LimitedDistinctAggregation rule in the physical
   plan optimizer and updates the GroupedHashAggregateStream with an
   optional soft limit on the number of `group_values` in a batch. If the
   number of `group_values` in a single batch exceeds the limit, the operation
   immediately signals the input is done, switches to output mode and emits all 
groups.
   
   This commit includes sqllogictests for DISTINCT queries with a LIMIT.
       
   The CombinePartialFinalAggregate rule is also updated to convey the
   limit on the final aggregation to the combined aggregation.
   
   #### Add datafusion.optimizer.enable_distinct_aggregation_soft_limit setting
   
   This commit adds the 
datafusion.optimizer.enable_distinct_aggregation_soft_limit
   configuration setting, which defaults to true. When true, the
   LimitedDistinctAggregation physical plan rewrite rule is enabled, which
   pushes a LIMIT into a grouped aggregation with no aggregate expressions,
   as a soft limit, to emit all grouped values seen so far once the limit is 
reached.
   
   #### Fix result checking in topk_aggregate benchmark
   
   This commit fixes the logic which validates the rows returned by the 
benchmark query.
   The test was expecting hexadecimal digits in lowercase, but results are 
uppercase.
   
   #### Make the topk_aggregate benchmark's make_data function public
   
   This commit moves the make_data function, which generates either random or 
ascending
   time series data, to the data_utils module, so it could be shared by other 
benchmarks.
   
   #### Add benchmark for DISTINCT queries
   
   This commit adds a benchmark for queries using DISTINCT or GROUP BY with a 
LIMIT clause
   and no aggregate expressions. It is intended to test the performance of the 
   `LimitedDistinctAggregation` rewrite rule and new limit hint in 
`GroupedHashAggregateStream`.
   
   ## Are these changes tested?
   
   - [x] unit tests for LimitedDistinctAggregation
   - [x] unit test for propagating the AggregateExec limit to the result of 
`CombinePartialFinalAggregate`
   - [x] unit test for 
datafusion.optimizer.enable_distinct_aggregation_soft_limit setting
   - [x] sqllogictests to check correct results of AggregateExec with a limit
   - [x] criterion benchmarks for DISTINCT queries
   
   ## Are there any user-facing changes?
   
   No
   
   ## Notes
   
   This is opened as a draft PR. 
   PRs for the individual commits can be opened separately if this is too large 
to review in one PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to