alamb commented on code in PR #7981:
URL: https://github.com/apache/arrow-datafusion/pull/7981#discussion_r1380703847


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2132,9 +2181,100 @@ pub struct Limit {
 
 /// Removes duplicate rows from the input
 #[derive(Clone, PartialEq, Eq, Hash)]
-pub struct Distinct {
+pub enum Distinct {
+    /// Plain `DISTINCT` referencing all selection expressions
+    All(Arc<LogicalPlan>),
+    /// The `Postgres` addition, allowing separate control over DISTINCT'd and 
selected columns
+    On(DistinctOn),
+}
+
+/// Removes duplicate rows from the input
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DistinctOn {
+    /// The `DISTINCT ON` clause expression list
+    pub on_expr: Vec<Expr>,
+    /// The selected projection expression list
+    pub select_expr: Vec<Expr>,
+    /// The `ORDER BY` clause, whose initial expressions must match those of 
the `ON` clause

Review Comment:
   Since the same list of exprs is included in both the `select_list` and 
`sort_exprs` I wonder if it would be less error prone to have references rather 
than keep three parallel lists:
   
   ```
       /// The selected projection expression list
       pub select_expr: Vec<Expr>,
       /// The sort expressions
       pub sort_expr: Vec<Expr>,
       /// the number of prefix columns from `sort_expr` that form the `ON` 
clause used for deduplicating



##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -32,6 +36,22 @@ use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
 /// ```text
 /// SELECT a, b FROM tab GROUP BY a, b
 /// ```
+///
+/// On the other hand, for a `DISTINCT ON` query the replacement is
+/// a bit more involved and effectively converts
+/// ```text
+/// SELECT DISTINCT ON (a) b FROM tab ORDER BY a DESC, c
+/// ```
+///
+/// into
+/// ```text
+/// SELECT b FROM (
+///     SELECT a, FIRST_VALUE(b ORDER BY a DESC, c) AS b

Review Comment:
   ```suggestion
   ///     SELECT a, FIRST_VALUE(PARTITION BY b ORDER BY a DESC, c) AS b
   ```



##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -32,6 +36,22 @@ use datafusion_expr::{Aggregate, Distinct, LogicalPlan};
 /// ```text
 /// SELECT a, b FROM tab GROUP BY a, b
 /// ```
+///
+/// On the other hand, for a `DISTINCT ON` query the replacement is

Review Comment:
   Thank you -- this is very helpful to understand what is going on



##########
datafusion/sqllogictest/test_files/distinct_on.slt:
##########
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+  c1  VARCHAR NOT NULL,
+  c2  TINYINT NOT NULL,
+  c3  SMALLINT NOT NULL,
+  c4  SMALLINT,
+  c5  INT,
+  c6  BIGINT NOT NULL,
+  c7  SMALLINT NOT NULL,
+  c8  INT NOT NULL,
+  c9  BIGINT UNSIGNED NOT NULL,
+  c10 VARCHAR NOT NULL,
+  c11 FLOAT NOT NULL,
+  c12 DOUBLE NOT NULL,
+  c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# Basic example: distinct on the first column project the second one, and
+# order by the third
+# TODO: Note that the value for 'a' is actually degenerate, and while Postgres
+# returns the actual first match (4), DataFusion returns the last one, 
presumably
+# because `first_value` aggregation function doesn't have a dedicated 
`GroupsAccumulator`

Review Comment:
   this doesn't make sense to me -- we should get the correct values, 
regardless of the presence of a groups accumulator



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -458,16 +467,19 @@ fn build_common_expr_project_plan(
 /// the "intermediate" projection plan built in 
[build_common_expr_project_plan].
 ///
 /// This is for those plans who don't keep its own output schema like `Filter` 
or `Sort`.
-fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> 
LogicalPlan {
+fn build_recover_project_plan(
+    schema: &DFSchema,
+    input: LogicalPlan,
+) -> Result<LogicalPlan> {
     let col_exprs = schema
         .fields()
         .iter()
         .map(|field| Expr::Column(field.qualified_column()))
         .collect();
-    LogicalPlan::Projection(
-        Projection::try_new(col_exprs, Arc::new(input))
-            .expect("Cannot build projection plan from an invalid schema"),
-    )
+    Ok(LogicalPlan::Projection(Projection::try_new(

Review Comment:
   👍 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -458,16 +467,19 @@ fn build_common_expr_project_plan(
 /// the "intermediate" projection plan built in 
[build_common_expr_project_plan].
 ///
 /// This is for those plans who don't keep its own output schema like `Filter` 
or `Sort`.
-fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> 
LogicalPlan {
+fn build_recover_project_plan(
+    schema: &DFSchema,
+    input: LogicalPlan,
+) -> Result<LogicalPlan> {
     let col_exprs = schema
         .fields()
         .iter()
         .map(|field| Expr::Column(field.qualified_column()))
         .collect();
-    LogicalPlan::Projection(
-        Projection::try_new(col_exprs, Arc::new(input))
-            .expect("Cannot build projection plan from an invalid schema"),
-    )
+    Ok(LogicalPlan::Projection(Projection::try_new(

Review Comment:
   👍 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -238,8 +238,17 @@ impl CommonSubexprEliminate {
         let rewritten = pop_expr(&mut rewritten)?;
 
         if affected_id.is_empty() {
+            // Alias aggregation epxressions if they have changed
+            // TODO: This should have really been identified above and handled 
in the `else` branch
+            let aggr_exprs = new_aggr_expr
+                .iter()
+                .zip(aggr_expr.iter())
+                .map(|(new_expr, old_expr)| {
+                    new_expr.clone().alias_if_changed(old_expr.display_name()?)
+                })
+                .collect::<Result<Vec<Expr>>>()?;
             // Since group_epxr changes, schema changes also. Use try_new 
method.
-            Aggregate::try_new(Arc::new(new_input), new_group_expr, 
new_aggr_expr)
+            Aggregate::try_new(Arc::new(new_input), new_group_expr, aggr_exprs)
                 .map(LogicalPlan::Aggregate)

Review Comment:
   Maybe @waynexia has some thoughts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to