gruuya commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1382641037
##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
)?);
Ok(Some(aggregate))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ select_expr,
+ on_expr,
+ sort_expr,
+ input,
+ ..
+ })) => {
+ // Construct the aggregation expression to be used to fetch
the selected expressions.
+ let aggr_expr = select_expr
+ .iter()
+ .map(|e| {
+ Expr::AggregateFunction(AggregateFunction::new(
+ AggregateFunctionFunc::FirstValue,
+ vec![e.clone()],
+ false,
+ None,
+ sort_expr.clone(),
+ ))
+ })
+ .collect::<Vec<Expr>>();
+
+ // Build the aggregation plan
+ let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+ .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+ .build()?;
+
+ let plan = if let Some(sort_expr) = sort_expr {
+ // While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
+ // this on it's own isn't enough to guarantee the proper
output order of the grouping
+ // (`ON`) expression, so we need to sort those as well.
+ LogicalPlanBuilder::from(plan)
+ .sort(sort_expr[..on_expr.len()].to_vec())?
Review Comment:
I see now that my assumptions were wrong, and that indeed sorting gets
pulled down beneath aggregation during physical planning anyway. I do think
that in the case of `FIRST_VALUE` with `ORDER BY` this is sub-optimal, since
that problem reduces to a partitioned top-1 problem, which doesn't need to work
on pre-sorted input I think.
Either way, even when I pull sorting beneath the aggregation, I still find I
end up needing another post-aggregation sort to get the correct order for the
`ON` clause expressions, since presumably `AggregateExec` doesn't preserve
order:
```sql
❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2 |
| | Aggregate: groupBy=[[test.c1]],
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]]] |
| | Sort: test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST
|
| | TableScan: test projection=[c1, c2, c3]
|
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1,
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]@2 as test.c2] |
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | SortExec: expr=[c1@0 ASC NULLS LAST]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([c1@0], 10),
input_partitions=10
|
| | AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
|
| | SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC
NULLS LAST]
|
| | CsvExec: file_groups={1 group:
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
projection=[c1, c2, c3], has_header=true |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
compare that with the currently generated plans which do get the order
correct:
```sql
❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2 |
| | Sort: test.c1 ASC NULLS LAST
|
| | Aggregate: groupBy=[[test.c1]],
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]]] |
| | TableScan: test projection=[c1, c2, c3]
|
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1,
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]@2 as test.c2] |
| | SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
|
| | SortExec: expr=[c1@0 ASC NULLS LAST]
|
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as
c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([c1@0], 10),
input_partitions=10
|
| | AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC
NULLS LAST]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
|
| | CsvExec: file_groups={1 group:
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
projection=[c1, c2, c3], has_header=true |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
)?);
Ok(Some(aggregate))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ select_expr,
+ on_expr,
+ sort_expr,
+ input,
+ ..
+ })) => {
+ // Construct the aggregation expression to be used to fetch
the selected expressions.
+ let aggr_expr = select_expr
+ .iter()
+ .map(|e| {
+ Expr::AggregateFunction(AggregateFunction::new(
+ AggregateFunctionFunc::FirstValue,
+ vec![e.clone()],
+ false,
+ None,
+ sort_expr.clone(),
+ ))
+ })
+ .collect::<Vec<Expr>>();
+
+ // Build the aggregation plan
+ let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+ .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+ .build()?;
+
+ let plan = if let Some(sort_expr) = sort_expr {
+ // While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
+ // this on it's own isn't enough to guarantee the proper
output order of the grouping
+ // (`ON`) expression, so we need to sort those as well.
+ LogicalPlanBuilder::from(plan)
+ .sort(sort_expr[..on_expr.len()].to_vec())?
Review Comment:
I see now that my assumptions were wrong, and that indeed sorting gets
pulled down beneath aggregation during physical planning anyway. I do think
that in the case of `FIRST_VALUE` with `ORDER BY` this is sub-optimal, since
that problem reduces to a partitioned top-1 problem, which doesn't need to work
on pre-sorted input I think.
Either way, even when I pull sorting beneath the aggregation, I still find I
end up needing another post-aggregation sort to get the correct order for the
`ON` clause expressions, since presumably `AggregateExec` doesn't preserve
order:
```sql
❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2 |
| | Aggregate: groupBy=[[test.c1]],
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]]] |
| | Sort: test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST
|
| | TableScan: test projection=[c1, c2, c3]
|
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1,
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]@2 as test.c2] |
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | SortExec: expr=[c1@0 ASC NULLS LAST]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([c1@0], 10),
input_partitions=10
|
| | AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
|
| | SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC
NULLS LAST]
|
| | CsvExec: file_groups={1 group:
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
projection=[c1, c2, c3], has_header=true |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
compare that with the currently generated plans which do get the order
correct:
```sql
❯ explain select distinct on (c1) c1, c2 from test order by c1, c3;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC
NULLS LAST, test.c3 ASC NULLS LAST] AS test.c1, FIRST_VALUE(test.c2) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST] AS test.c2 |
| | Sort: test.c1 ASC NULLS LAST
|
| | Aggregate: groupBy=[[test.c1]],
aggr=[[FIRST_VALUE(test.c1) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST], FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]]] |
| | TableScan: test projection=[c1, c2, c3]
|
| physical_plan | ProjectionExec: expr=[FIRST_VALUE(test.c1) ORDER BY
[test.c1 ASC NULLS LAST, test.c3 ASC NULLS LAST]@1 as test.c1,
FIRST_VALUE(test.c2) ORDER BY [test.c1 ASC NULLS LAST, test.c3 ASC NULLS
LAST]@2 as test.c2] |
| | SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
|
| | SortExec: expr=[c1@0 ASC NULLS LAST]
|
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as
c1], aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([c1@0], 10),
input_partitions=10
|
| | AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(test.c1), FIRST_VALUE(test.c2)], ordering_mode=Sorted
|
| | SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC
NULLS LAST]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1
|
| | CsvExec: file_groups={1 group:
[[Users/markogrujic/Splitgraph/arrow-datafusion/testing/data/csv/aggregate_test_100.csv]]},
projection=[c1, c2, c3], has_header=true |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]