alamb commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1382374720
##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -60,6 +80,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
)?);
Ok(Some(aggregate))
}
+ LogicalPlan::Distinct(Distinct::On(DistinctOn {
+ select_expr,
+ on_expr,
+ sort_expr,
+ input,
+ ..
+ })) => {
+ // Construct the aggregation expression to be used to fetch
the selected expressions.
+ let aggr_expr = select_expr
+ .iter()
+ .map(|e| {
+ Expr::AggregateFunction(AggregateFunction::new(
+ AggregateFunctionFunc::FirstValue,
+ vec![e.clone()],
+ false,
+ None,
+ sort_expr.clone(),
+ ))
+ })
+ .collect::<Vec<Expr>>();
+
+ // Build the aggregation plan
+ let plan = LogicalPlanBuilder::from(input.as_ref().clone())
+ .aggregate(on_expr.clone(), aggr_expr.to_vec())?
+ .build()?;
+
+ let plan = if let Some(sort_expr) = sort_expr {
+ // While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
+ // this on it's own isn't enough to guarantee the proper
output order of the grouping
+ // (`ON`) expression, so we need to sort those as well.
+ LogicalPlanBuilder::from(plan)
+ .sort(sort_expr[..on_expr.len()].to_vec())?
Review Comment:
Would it make sense to sort the input prior to aggregating it? It seems like
sorting to do the aggregate and then sorting again is not as efficient as being
able to just sort once and reuse the output.
In theory the optimizer should handle this rewrite, but it might be better
to just start with the sort under the aggregate
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2132,9 +2181,100 @@ pub struct Limit {
/// Removes duplicate rows from the input
#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct Distinct {
+pub enum Distinct {
+ /// Plain `DISTINCT` referencing all selection expressions
+ All(Arc<LogicalPlan>),
+ /// The `Postgres` addition, allowing separate control over DISTINCT'd and
selected columns
+ On(DistinctOn),
+}
+
+/// Removes duplicate rows from the input
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DistinctOn {
+ /// The `DISTINCT ON` clause expression list
+ pub on_expr: Vec<Expr>,
+ /// The selected projection expression list
+ pub select_expr: Vec<Expr>,
+ /// The `ORDER BY` clause, whose initial expressions must match those of
the `ON` clause
Review Comment:
I think your explanation makes sense. Maybe we can take some of this
rationale (e.g. that `sort_exprs` are not a superset as they are wrapped by
Expr::Sort) and put them into the doc comments. We can do this as a follow on
PR
##########
datafusion/sqllogictest/test_files/distinct_on.slt:
##########
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# Basic example: distinct on the first column project the second one, and
+# order by the third
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3;
+----
+a 5
+b 4
+c 2
+d 1
+e 3
+
+# Basic example + reverse order of the selected column
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1, c3 DESC;
+----
+a 1
+b 5
+c 4
+d 1
+e 1
+
+# Basic example + reverse order of the ON column
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1 DESC, c3;
+----
+e 3
+d 1
+c 2
+b 4
+a 4
+
+# Basic example + reverse order of both columns + limit
+query TI
+SELECT DISTINCT ON (c1) c1, c2 FROM aggregate_test_100 ORDER BY c1 DESC, c3
DESC LIMIT 3;
+----
+e 1
+d 1
+c 4
+
+# Basic example + omit ON column from selection
+query I
+SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
Review Comment:
Can you maybe also include an explain plan here so we can validate the plan?
Something like
```
EXPLAIN SELECT DISTINCT ON (c1) c3, c2 FROM aggregate_test_100 ORDER BY c1,
c3;
```
##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -32,6 +36,22 @@ use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
/// ```text
/// SELECT a, b FROM tab GROUP BY a, b
/// ```
+///
+/// On the other hand, for a `DISTINCT ON` query the replacement is
+/// a bit more involved and effectively converts
+/// ```text
+/// SELECT DISTINCT ON (a) b FROM tab ORDER BY a DESC, c
+/// ```
+///
+/// into
+/// ```text
+/// SELECT b FROM (
+/// SELECT a, FIRST_VALUE(b ORDER BY a DESC, c) AS b
Review Comment:
Sorry -- thank you for the clarification
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]