gruuya commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1382416566
##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
)?);
Ok(Some(aggregate))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ select_expr,
+ on_expr,
+ sort_expr,
+ input,
+ ..
+ })) => {
+ // Construct the aggregation expression to be used to fetch
the selected expressions.
+ let aggr_expr = select_expr
+ .iter()
+ .map(|e| {
+ Expr::AggregateFunction(AggregateFunction::new(
+ AggregateFunctionFunc::FirstValue,
+ vec![e.clone()],
+ false,
+ None,
+ sort_expr.clone(),
+ ))
+ })
+ .collect::<Vec<Expr>>();
+
+ // Build the aggregation plan
+ let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+ .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+ .build()?;
+
+ let plan = if let Some(sort_expr) = sort_expr {
+ // While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
+ // this on it's own isn't enough to guarantee the proper
output order of the grouping
+ // (`ON`) expression, so we need to sort those as well.
+ LogicalPlanBuilder::from(plan)
+ .sort(sort_expr[..on_expr.len()].to_vec())?
Review Comment:
Yeah, my thinking was that the `first_value` aggregation actually doesn't do
any real sorting and is more like top-1, i.e. it only works on the incoming
stream and picks up one row per-grouping set (`ON` clause) if it's a better
match according to the sorting expressions:
https://github.com/apache/arrow-datafusion/blob/2af326a0bdbb60e7d7420089c7aa9ff22c7319ee/datafusion/physical-expr/src/aggregate/first_last.rs#L236-L244
The subsequent actual sort works on the grouped data, so it should have a
much lower cardinality.
Whereas if we sorted prior to aggregation then, I think we would materialize
all the data initially (whether in memory or by spilling) only to throw away
most of it, but I could be wrong.
Granted I haven't done any perf testing between the two approaches for now,
this is just from my reading of the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]