UBarney commented on code in PR #15876: URL: https://github.com/apache/datafusion/pull/15876#discussion_r2083447146
########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -797,26 +807,146 @@ impl LogicalPlanBuilder { } // remove pushed down sort columns - let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); + let sort_output = schema.columns().into_iter().map(Expr::Column).collect(); + + let plan = Arc::unwrap_or_clone(self.plan); + + let (plan, agg_func_to_col, sorts) = if missing_agg_funcs.is_empty() { + (plan, HashMap::new(), sorts) + } else { + { + let (plan, agg_func_to_col) = + Self::add_missing_agg_funcs_to_logical_agg(plan, &missing_agg_funcs)?; + + let sorts = sorts + .iter() + .map(|x| { + Self::replace_subexpr_to_col(&x.expr, &agg_func_to_col) + .map(|expr| x.with_expr(expr)) + }) + .collect::<Result<Vec<_>, _>>()?; + + (plan, agg_func_to_col, sorts) + } + }; + + // we need downstream filter/project return missing col(agg_funcs) + missing_cols.extend(agg_func_to_col.into_values()); let is_distinct = false; - let plan = Self::add_missing_columns( - Arc::unwrap_or_clone(self.plan), - &missing_cols, - is_distinct, - )?; + let plan = Self::add_missing_columns(plan, &missing_cols, is_distinct)?; let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &plan)?, input: Arc::new(plan), fetch, }); - Projection::try_new(new_expr, Arc::new(sort_plan)) + Projection::try_new(sort_output, Arc::new(sort_plan)) .map(LogicalPlan::Projection) .map(Self::new) } + fn replace_subexpr_to_col( + expr: &Expr, + func_to_col: &HashMap<Expr, Column>, + ) -> Result<Expr> { + Ok(expr + .clone() + .transform_down(|nested_expr| { + if let Some(col) = func_to_col.get(&nested_expr) { + Ok(Transformed::yes(Expr::Column(col.clone()))) + } else { + Ok(Transformed::no(nested_expr)) + } + })? + .data) + } + + fn add_missing_agg_funcs_to_logical_agg( + plan: LogicalPlan, + missing_agg_funcs: &IndexSet<Expr>, + ) -> Result<(LogicalPlan, HashMap<Expr, Column>)> { + let mut agg_func_to_output_col: HashMap<Expr, Column> = + HashMap::with_capacity(missing_agg_funcs.len()); + + if missing_agg_funcs.is_empty() { + return Ok((plan, agg_func_to_output_col)); + } + + let plan = plan + .transform_down(|plan| { Review Comment: I fix this problem by adding missing_agg_funcs to agg only when there are filter and project operators between sort and agg operators, otherwise report an error. ``` // OK Sort: min(employee_csv.salary) Filter Project Aggregate: groupBy=[[employee_csv.state]], aggr=[[max(employee_csv.salary)]] // Error Sort: min(employee_csv.salary) Join Aggregate TableScan ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org