alamb commented on code in PR #5067:
URL: https://github.com/apache/arrow-datafusion/pull/5067#discussion_r1091194540


##########
datafusion/expr/src/expr_rewriter/order_by.rs:
##########
@@ -54,56 +52,84 @@ pub fn rewrite_sort_cols_by_aggs(
 }
 
 fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
-    match plan {
-        LogicalPlan::Aggregate(Aggregate {
-            input,
-            aggr_expr,
-            group_expr,
-            ..
-        }) => {
-            struct Rewriter<'a> {
-                plan: &'a LogicalPlan,
-                input: &'a LogicalPlan,
-                aggr_expr: &'a Vec<Expr>,
-                distinct_group_exprs: &'a Vec<Expr>,
-            }
+    let plan_inputs = plan.inputs();
 
-            impl<'a> ExprRewriter for Rewriter<'a> {
-                fn mutate(&mut self, expr: Expr) -> Result<Expr> {
-                    let normalized_expr = normalize_col(expr.clone(), 
self.plan);
-                    if normalized_expr.is_err() {
-                        // The expr is not based on Aggregate plan output. 
Skip it.
-                        return Ok(expr);
-                    }
-                    let normalized_expr = normalized_expr?;
-                    if let Some(found_agg) = self
-                        .aggr_expr
-                        .iter()
-                        .chain(self.distinct_group_exprs)
-                        .find(|a| (**a) == normalized_expr)
-                    {
-                        let agg = normalize_col(found_agg.clone(), self.plan)?;
-                        let col = Expr::Column(
-                            agg.to_field(self.input.schema())
-                                .map(|f| f.qualified_column())?,
-                        );
-                        Ok(col)
-                    } else {
-                        Ok(expr)
-                    }
-                }
-            }
+    // Joins, and Unions are not yet handled (should have a projection
+    // on top of them)
+    if plan_inputs.len() == 1 {
+        let proj_exprs = plan.expressions();
+        rewrite_in_terms_of_projection(expr, proj_exprs, plan_inputs[0])
+    } else {
+        Ok(expr)
+    }
+}
 
-            let distinct_group_exprs = 
grouping_set_to_exprlist(group_expr.as_slice())?;
-            expr.rewrite(&mut Rewriter {
-                plan,
-                input,
-                aggr_expr,
-                distinct_group_exprs: &distinct_group_exprs,
-            })
+/// Rewrites a sort expression in terms of the output of the previous 
[`LogicalPlan`]
+///
+/// Example:
+///
+/// Given an input expression such as `col(a) + col(b) + col(c)`
+///
+/// into `col(a) + col("b + c")`
+///
+/// Remember that:
+/// 1. given a projection with exprs: [a, b + c]
+/// 2. t produces an output schema with two columns "a", "b + c"
+fn rewrite_in_terms_of_projection(
+    expr: Expr,
+    proj_exprs: Vec<Expr>,
+    input: &LogicalPlan,
+) -> Result<Expr> {
+    // assumption is that each item in exprs, such as "b + c" is
+    // available as an output column named "b + c"
+    rewrite_expr(expr, |expr| {
+        // search for unnormalized names first such as "c1" (such as aliases)
+        if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) {
+            let col = Expr::Column(
+                found
+                    .to_field(input.schema())
+                    .map(|f| f.qualified_column())?,
+            );
+            return Ok(col);
         }
-        LogicalPlan::Projection(_) => rewrite_sort_col_by_aggs(expr, 
plan.inputs()[0]),
-        _ => Ok(expr),
+
+        // if that doesn't work, try to match the expression as an
+        // output column -- however first it must be "normalized"
+        // (e.g. "c1" --> "t.c1") because that normalization is done
+        // at the input of the aggregate.

Review Comment:
   I don't fully understand why this is needed, but several tests fail if this 
logic is removed and the previous logic did it as well



##########
datafusion/expr/src/expr_rewriter/order_by.rs:
##########
@@ -54,56 +52,84 @@ pub fn rewrite_sort_cols_by_aggs(
 }
 
 fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
-    match plan {
-        LogicalPlan::Aggregate(Aggregate {
-            input,
-            aggr_expr,
-            group_expr,
-            ..
-        }) => {
-            struct Rewriter<'a> {
-                plan: &'a LogicalPlan,
-                input: &'a LogicalPlan,
-                aggr_expr: &'a Vec<Expr>,
-                distinct_group_exprs: &'a Vec<Expr>,
-            }
+    let plan_inputs = plan.inputs();
 
-            impl<'a> ExprRewriter for Rewriter<'a> {
-                fn mutate(&mut self, expr: Expr) -> Result<Expr> {
-                    let normalized_expr = normalize_col(expr.clone(), 
self.plan);
-                    if normalized_expr.is_err() {
-                        // The expr is not based on Aggregate plan output. 
Skip it.
-                        return Ok(expr);
-                    }
-                    let normalized_expr = normalized_expr?;
-                    if let Some(found_agg) = self
-                        .aggr_expr
-                        .iter()
-                        .chain(self.distinct_group_exprs)
-                        .find(|a| (**a) == normalized_expr)
-                    {
-                        let agg = normalize_col(found_agg.clone(), self.plan)?;
-                        let col = Expr::Column(
-                            agg.to_field(self.input.schema())
-                                .map(|f| f.qualified_column())?,
-                        );
-                        Ok(col)
-                    } else {
-                        Ok(expr)
-                    }
-                }
-            }
+    // Joins, and Unions are not yet handled (should have a projection
+    // on top of them)
+    if plan_inputs.len() == 1 {
+        let proj_exprs = plan.expressions();
+        rewrite_in_terms_of_projection(expr, proj_exprs, plan_inputs[0])
+    } else {
+        Ok(expr)
+    }
+}
 
-            let distinct_group_exprs = 
grouping_set_to_exprlist(group_expr.as_slice())?;
-            expr.rewrite(&mut Rewriter {
-                plan,
-                input,
-                aggr_expr,
-                distinct_group_exprs: &distinct_group_exprs,
-            })
+/// Rewrites a sort expression in terms of the output of the previous 
[`LogicalPlan`]

Review Comment:
   This is the new more general algorithm that works for 
`LogicalPlan::Projection` and `LogicalPlan::Aggregation`



##########
datafusion/sql/tests/integration_test.rs:
##########
@@ -2308,6 +2314,22 @@ fn select_multibyte_column() {
     quick_test(sql, expected);
 }
 
+#[test]
+fn select_groupby_orderby() {
+    let sql = r#"SELECT
+  avg(age) AS "value",
+  date_trunc('month', birth_date) AS "birth_date"
+  FROM person GROUP BY birth_date ORDER BY birth_date;
+"#;
+    // expect that this is not an ambiguous reference
+    let expected =
+        "Sort: birth_date ASC NULLS LAST\
+         \n  Projection: AVG(person.age) AS value, datetrunc(Utf8(\"month\"), 
person.birth_date) AS birth_date\

Review Comment:
   this shows the plan is using the expression `datetrunc` rather than the 
original `birth_date` grouping column



##########
datafusion/expr/src/expr_rewriter/order_by.rs:
##########
@@ -136,24 +162,24 @@ mod test {
 
         let cases = vec![
             TestCase {
-                desc: "c1 --> t.c1",

Review Comment:
   I am not sure why direct aggregate creation doesn't need this rewrite 
anymore. All the other tests in DataFusion CI pass and I wrote the unit tests 
last week to document the existing behavior in 
https://github.com/apache/arrow-datafusion/pull/5088
   
   If reviewers prefer to keep the old behavior here, that is easy as the first 
commit in this PR,  
https://github.com/apache/arrow-datafusion/pull/5067/commits/0c16ef1d0965e4f59a3bb6880f225d7ddc56b47d
 actually keeps all these tests passing with the existing "aggregate" code. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to