UBarney commented on code in PR #15876: URL: https://github.com/apache/datafusion/pull/15876#discussion_r2079325033
########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -797,26 +807,146 @@ impl LogicalPlanBuilder { } // remove pushed down sort columns - let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); + let sort_output = schema.columns().into_iter().map(Expr::Column).collect(); + + let plan = Arc::unwrap_or_clone(self.plan); + + let (plan, agg_func_to_col, sorts) = if missing_agg_funcs.is_empty() { + (plan, HashMap::new(), sorts) + } else { + { + let (plan, agg_func_to_col) = + Self::add_missing_agg_funcs_to_logical_agg(plan, &missing_agg_funcs)?; + + let sorts = sorts + .iter() + .map(|x| { + Self::replace_subexpr_to_col(&x.expr, &agg_func_to_col) + .map(|expr| x.with_expr(expr)) + }) + .collect::<Result<Vec<_>, _>>()?; + + (plan, agg_func_to_col, sorts) + } + }; + + // we need downstream filter/project return missing col(agg_funcs) + missing_cols.extend(agg_func_to_col.into_values()); let is_distinct = false; - let plan = Self::add_missing_columns( - Arc::unwrap_or_clone(self.plan), - &missing_cols, - is_distinct, - )?; + let plan = Self::add_missing_columns(plan, &missing_cols, is_distinct)?; let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &plan)?, input: Arc::new(plan), fetch, }); - Projection::try_new(new_expr, Arc::new(sort_plan)) + Projection::try_new(sort_output, Arc::new(sort_plan)) .map(LogicalPlan::Projection) .map(Self::new) } + fn replace_subexpr_to_col( + expr: &Expr, + func_to_col: &HashMap<Expr, Column>, + ) -> Result<Expr> { + Ok(expr + .clone() + .transform_down(|nested_expr| { + if let Some(col) = func_to_col.get(&nested_expr) { + Ok(Transformed::yes(Expr::Column(col.clone()))) + } else { + Ok(Transformed::no(nested_expr)) + } + })? + .data) + } + + fn add_missing_agg_funcs_to_logical_agg( + plan: LogicalPlan, + missing_agg_funcs: &IndexSet<Expr>, + ) -> Result<(LogicalPlan, HashMap<Expr, Column>)> { + let mut agg_func_to_output_col: HashMap<Expr, Column> = + HashMap::with_capacity(missing_agg_funcs.len()); + + if missing_agg_funcs.is_empty() { + return Ok((plan, agg_func_to_output_col)); + } + + let plan = plan + .transform_down(|plan| { Review Comment: > I think a safer approach is to directly add the missing aggregate functions to the select list at an earlier stage. That is, transform select min(a) from t order by max(a) into select min(a), max(a) from t order by max(a) , similar to what was done in #14180. Why we should "at an earlier stage" to do this transformation ? In this PR. For query plan like ``` // select max(employee_csv.salary) from employee_csv group by employee_csv.state order by min(employee_csv.salary) Sort: min(employee_csv.salary) Aggregate: groupBy=[[employee_csv.state]], aggr=[[max(employee_csv.salary)]] ``` The plan will be transformed into: ``` Projection: col(max(employee_csv.salary)) // in final result set, it only output `max(employee_csv.salary)`. Sort: col(min(employee_csv.salary)) // it's a column produced by agg Aggregate: groupBy=[[employee_csv.state]], aggr=[[max(employee_csv.salary), min(employee_csv.salary)]] ``` The proposed logic change to `sort_with_limit` makes sense since it handles similar functionality: - find missing aggreate function should output - add to operator - add project > transform select min(a) from t order by max(a) into select min(a), max(a) from t order by max(a) I think it should be transformed into "select "min(a)" from (select min(a) , max(a) from t order by max(a))" ? > We should only consider aggregates from the select list of the current query. Using traversal does not guarantee this, meaning that we might encounter aggregate functions not originating from the select list, leading to a bad logical plan. Could you explain this in more detail? According to my understanding, if there are aggregate functions in the ORDER BY clause, they should be placed in the aggregation operator below (which is what this PR does). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org