gruuya commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1382416566


##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
                 )?);
                 Ok(Some(aggregate))
             }
+            LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                select_expr,
+                on_expr,
+                sort_expr,
+                input,
+                ..
+            })) => {
+                // Construct the aggregation expression to be used to fetch 
the selected expressions.
+                let aggr_expr = select_expr
+                    .iter()
+                    .map(|e| {
+                        Expr::AggregateFunction(AggregateFunction::new(
+                            AggregateFunctionFunc::FirstValue,
+                            vec![e.clone()],
+                            false,
+                            None,
+                            sort_expr.clone(),
+                        ))
+                    })
+                    .collect::<Vec<Expr>>();
+
+                // Build the aggregation plan
+                let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+                    .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+                    .build()?;
+
+                let plan = if let Some(sort_expr) = sort_expr {
+                    // While sort expressions were used in the `FIRST_VALUE` 
aggregation itself above,
+                    // this on it's own isn't enough to guarantee the proper 
output order of the grouping
+                    // (`ON`) expression, so we need to sort those as well.
+                    LogicalPlanBuilder::from(plan)
+                        .sort(sort_expr[..on_expr.len()].to_vec())?

Review Comment:
   Yeah, my thinking was that the `first_value` aggregation actually doesn't do 
any real sorting and is more like top-1, i.e. it only works on the incoming 
stream and picks up one row per-grouping set (`ON` clause) if it's a better 
match according to the sorting expressions:
   
https://github.com/apache/arrow-datafusion/blob/2af326a0bdbb60e7d7420089c7aa9ff22c7319ee/datafusion/physical-expr/src/aggregate/first_last.rs#L236-L244
   The subsequent actual sort works on the grouped data, so it should have a 
much lower cardinality.
   
   Whereas if we sorted prior to aggregation then, I think we would materialize 
all the data initially (whether in memory or by spilling) only to throw away 
most of it, but I could be wrong.
   
   Granted I haven't done any perf testing between the two approaches for now, 
this is just from my reading of the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to