jackwener commented on code in PR #5067:
URL: https://github.com/apache/arrow-datafusion/pull/5067#discussion_r1094487990
##########
datafusion/expr/src/expr_rewriter/order_by.rs:
##########
@@ -54,56 +52,84 @@ pub fn rewrite_sort_cols_by_aggs(
}
fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
- match plan {
- LogicalPlan::Aggregate(Aggregate {
- input,
- aggr_expr,
- group_expr,
- ..
- }) => {
- struct Rewriter<'a> {
- plan: &'a LogicalPlan,
- input: &'a LogicalPlan,
- aggr_expr: &'a Vec<Expr>,
- distinct_group_exprs: &'a Vec<Expr>,
- }
+ let plan_inputs = plan.inputs();
- impl<'a> ExprRewriter for Rewriter<'a> {
- fn mutate(&mut self, expr: Expr) -> Result<Expr> {
- let normalized_expr = normalize_col(expr.clone(),
self.plan);
- if normalized_expr.is_err() {
- // The expr is not based on Aggregate plan output.
Skip it.
- return Ok(expr);
- }
- let normalized_expr = normalized_expr?;
- if let Some(found_agg) = self
- .aggr_expr
- .iter()
- .chain(self.distinct_group_exprs)
- .find(|a| (**a) == normalized_expr)
- {
- let agg = normalize_col(found_agg.clone(), self.plan)?;
- let col = Expr::Column(
- agg.to_field(self.input.schema())
- .map(|f| f.qualified_column())?,
- );
- Ok(col)
- } else {
- Ok(expr)
- }
- }
- }
+ // Joins, and Unions are not yet handled (should have a projection
+ // on top of them)
+ if plan_inputs.len() == 1 {
+ let proj_exprs = plan.expressions();
+ rewrite_in_terms_of_projection(expr, proj_exprs, plan_inputs[0])
+ } else {
+ Ok(expr)
+ }
+}
- let distinct_group_exprs =
grouping_set_to_exprlist(group_expr.as_slice())?;
- expr.rewrite(&mut Rewriter {
- plan,
- input,
- aggr_expr,
- distinct_group_exprs: &distinct_group_exprs,
- })
+/// Rewrites a sort expression in terms of the output of the previous
[`LogicalPlan`]
+///
+/// Example:
+///
+/// Given an input expression such as `col(a) + col(b) + col(c)`
+///
+/// into `col(a) + col("b + c")`
+///
+/// Remember that:
+/// 1. given a projection with exprs: [a, b + c]
+/// 2. t produces an output schema with two columns "a", "b + c"
+fn rewrite_in_terms_of_projection(
+ expr: Expr,
+ proj_exprs: Vec<Expr>,
+ input: &LogicalPlan,
+) -> Result<Expr> {
+ // assumption is that each item in exprs, such as "b + c" is
+ // available as an output column named "b + c"
+ rewrite_expr(expr, |expr| {
+ // search for unnormalized names first such as "c1" (such as aliases)
+ if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) {
+ let col = Expr::Column(
+ found
+ .to_field(input.schema())
+ .map(|f| f.qualified_column())?,
+ );
+ return Ok(col);
}
- LogicalPlan::Projection(_) => rewrite_sort_col_by_aggs(expr,
plan.inputs()[0]),
- _ => Ok(expr),
+
+ // if that doesn't work, try to match the expression as an
+ // output column -- however first it must be "normalized"
+ // (e.g. "c1" --> "t.c1") because that normalization is done
+ // at the input of the aggregate.
Review Comment:
Maybe(Just to offer a possibility) it's related with `qualifier`.
In the comment:
```
/// 1. given a projection with exprs: [a, b + c]
/// 2. t produces an output schema with two columns "a", "b + c"
```
But in fact, in the plan, `[a, b + c]` can be [table1.a, "table2.b+c"].
output schema also may be with qualifier like above.
Due to qualifier, some equations are not true, like 'a' == 't1.a' will be
false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]