gruuya commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1382641037


##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
                 )?);
                 Ok(Some(aggregate))
             }
+            LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                select_expr,
+                on_expr,
+                sort_expr,
+                input,
+                ..
+            })) => {
+                // Construct the aggregation expression to be used to fetch 
the selected expressions.
+                let aggr_expr = select_expr
+                    .iter()
+                    .map(|e| {
+                        Expr::AggregateFunction(AggregateFunction::new(
+                            AggregateFunctionFunc::FirstValue,
+                            vec![e.clone()],
+                            false,
+                            None,
+                            sort_expr.clone(),
+                        ))
+                    })
+                    .collect::<Vec<Expr>>();
+
+                // Build the aggregation plan
+                let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+                    .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+                    .build()?;
+
+                let plan = if let Some(sort_expr) = sort_expr {
+                    // While sort expressions were used in the `FIRST_VALUE` 
aggregation itself above,
+                    // this on it's own isn't enough to guarantee the proper 
output order of the grouping
+                    // (`ON`) expression, so we need to sort those as well.
+                    LogicalPlanBuilder::from(plan)
+                        .sort(sort_expr[..on_expr.len()].to_vec())?

Review Comment:
   I see now that my assumptions were wrong, and that indeed sorting gets 
pulled down beneath aggregation during physical planning anyway. I do think 
that in the case of `FIRST_VALUE` with `ORDER BY` this is sub-optimal, since 
that problem reduces to a partitioned top-1 problem, which doesn't need to work 
on pre-sorted input I think.
   
   Either way, even when I pull sorting beneath the aggregation, I still find I 
end up needing another post-aggregation sort to get the correct order for the 
`ON` clause expressions, since presumably `AggregateExec` doesn't preserve 
order: 
   ```sql
   ❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC 
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
   |               |   Aggregate: groupBy=[[test.c1]], 
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]]]       |
   |               |     Sort: test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST   
                                                                                
                                                                     |
   |               |       TableScan: test projection=[c1, c2, c3]              
                                                                                
                                                                     |
   | physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, 
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]@2 as test.c2] |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                      |
   |               |     SortExec: expr=[c1@0 ASC NULLS LAST]                   
                                                                                
                                                                     |
   |               |       CoalesceBatchesExec: target_batch_size=8192          
                                                                                
                                                                     |
   |               |         RepartitionExec: partitioning=Hash([c1@0], 10), 
input_partitions=10                                                             
                                                                        |
   |               |           AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                       |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1                            
                                                                                
                   |
   |               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC 
NULLS LAST]                                                                     
                                                                     |
   |               |                 CsvExec: file_groups={1 group: 
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
 projection=[c1, c2, c3], has_header=true                             |
   |               |                                                            
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   compare that with the currently generated plans which do get the order 
correct:
   ```sql
   ❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC 
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
   |               |   Sort: test.c1 ASC NULLS LAST                             
                                                                                
                                                                     |
   |               |     Aggregate: groupBy=[[test.c1]], 
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]]]     |
   |               |       TableScan: test projection=[c1, c2, c3]              
                                                                                
                                                                     |
   | physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, 
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]@2 as test.c2] |
   |               |   SortPreservingMergeExec: [c1@0 ASC NULLS LAST]           
                                                                                
                                                                     |
   |               |     SortExec: expr=[c1@0 ASC NULLS LAST]                   
                                                                                
                                                                     |
   |               |       AggregateExec: mode=FinalPartitioned, gby=[c1@0 as 
c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)]                          
                                                                       |
   |               |         CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                                                     |
   |               |           RepartitionExec: partitioning=Hash([c1@0], 10), 
input_partitions=10                                                             
                                                                      |
   |               |             AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                     |
   |               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC 
NULLS LAST]                                                                     
                                                                     |
   |               |                 RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1                            
                                                                                
               |
   |               |                   CsvExec: file_groups={1 group: 
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
 projection=[c1, c2, c3], has_header=true                           |
   |               |                                                            
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```



##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
                 )?);
                 Ok(Some(aggregate))
             }
+            LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                select_expr,
+                on_expr,
+                sort_expr,
+                input,
+                ..
+            })) => {
+                // Construct the aggregation expression to be used to fetch 
the selected expressions.
+                let aggr_expr = select_expr
+                    .iter()
+                    .map(|e| {
+                        Expr::AggregateFunction(AggregateFunction::new(
+                            AggregateFunctionFunc::FirstValue,
+                            vec![e.clone()],
+                            false,
+                            None,
+                            sort_expr.clone(),
+                        ))
+                    })
+                    .collect::<Vec<Expr>>();
+
+                // Build the aggregation plan
+                let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+                    .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+                    .build()?;
+
+                let plan = if let Some(sort_expr) = sort_expr {
+                    // While sort expressions were used in the `FIRST_VALUE` 
aggregation itself above,
+                    // this on it's own isn't enough to guarantee the proper 
output order of the grouping
+                    // (`ON`) expression, so we need to sort those as well.
+                    LogicalPlanBuilder::from(plan)
+                        .sort(sort_expr[..on_expr.len()].to_vec())?

Review Comment:
   I see now that my assumptions were wrong, and that indeed sorting gets 
pulled down beneath aggregation during physical planning anyway. I do think 
that in the case of `FIRST_VALUE` with `ORDER BY` this is sub-optimal, since 
that problem reduces to a partitioned top-1 problem, which doesn't need to work 
on pre-sorted input I think.
   
   Either way, even when I pull sorting beneath the aggregation, I still find I 
end up needing another post-aggregation sort to get the correct order for the 
`ON` clause expressions, since presumably `AggregateExec` doesn't preserve 
order: 
   ```sql
   ❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC 
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
   |               |   Aggregate: groupBy=[[test.c1]], 
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]]]       |
   |               |     Sort: test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST   
                                                                                
                                                                     |
   |               |       TableScan: test projection=[c1, c2, c3]              
                                                                                
                                                                     |
   | physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, 
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]@2 as test.c2] |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                      |
   |               |     SortExec: expr=[c1@0 ASC NULLS LAST]                   
                                                                                
                                                                     |
   |               |       CoalesceBatchesExec: target_batch_size=8192          
                                                                                
                                                                     |
   |               |         RepartitionExec: partitioning=Hash([c1@0], 10), 
input_partitions=10                                                             
                                                                        |
   |               |           AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                       |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1                            
                                                                                
                   |
   |               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC 
NULLS LAST]                                                                     
                                                                     |
   |               |                 CsvExec: file_groups={1 group: 
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
 projection=[c1, c2, c3], has_header=true                             |
   |               |                                                            
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   compare that with the currently generated plans which do get the order 
correct:
   ```sql
   ❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC 
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2                |
   |               |   Sort: test.c1 ASC NULLS LAST                             
                                                                                
                                                                     |
   |               |     Aggregate: groupBy=[[test.c1]], 
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]]]     |
   |               |       TableScan: test projection=[c1, c2, c3]              
                                                                                
                                                                     |
   | physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY 
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1, 
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS 
LAST]@2 as test.c2] |
   |               |   SortPreservingMergeExec: [c1@0 ASC NULLS LAST]           
                                                                                
                                                                     |
   |               |     SortExec: expr=[c1@0 ASC NULLS LAST]                   
                                                                                
                                                                     |
   |               |       AggregateExec: mode=FinalPartitioned, gby=[c1@0 as 
c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)]                          
                                                                       |
   |               |         CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                                                     |
   |               |           RepartitionExec: partitioning=Hash([c1@0], 10), 
input_partitions=10                                                             
                                                                      |
   |               |             AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted         
                                                                     |
   |               |               SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC 
NULLS LAST]                                                                     
                                                                     |
   |               |                 RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1                            
                                                                                
               |
   |               |                   CsvExec: file_groups={1 group: 
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
 projection=[c1, c2, c3], has_header=true                           |
   |               |                                                            
                                                                                
                                                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to