peter-toth commented on code in PR #11683: URL: https://github.com/apache/datafusion/pull/11683#discussion_r1693953502
########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -353,96 +349,81 @@ impl CommonSubexprEliminate { window: Window, config: &dyn OptimizerConfig, ) -> Result<Transformed<LogicalPlan>> { - // collect all window expressions from any number of LogicalPlanWindow - let (mut window_exprs, mut window_schemas, mut plan) = + // Collects window expressions from consecutive `LogicalPlan::Window` nodes into + // a list. + let (window_expr_list, window_schemas, input) = get_consecutive_window_exprs(window); - let mut found_common = false; - let mut expr_stats = ExprStats::new(); - let arrays_per_window = window_exprs - .iter() - .map(|window_expr| { - self.to_arrays(window_expr, &mut expr_stats, ExprMask::Normal) - .map(|(fc, id_arrays)| { - found_common |= fc; - - id_arrays + // Extract common sub-expressions from the list. + self.find_common_exprs(window_expr_list, config, ExprMask::Normal)? + .map_data(|(new_window_expr_list, common)| match common { + // If there are common sub-expressions, then the insert a projection node + // with the common expressions between the new window nodes and the + // original input. + Some((common_exprs, window_expr_list)) => { + build_common_expr_project_plan(input, common_exprs).map(|new_input| { + (new_window_expr_list, new_input, Some(window_expr_list)) }) - }) - .collect::<Result<Vec<_>>>()?; + } - if found_common { - // save the original names - let name_preserver = NamePreserver::new(&plan); - let mut saved_names = window_exprs - .iter() - .map(|exprs| { - exprs - .iter() - .map(|expr| name_preserver.save(expr)) - .collect::<Result<Vec<_>>>() + None => Ok((new_window_expr_list, input, None)), + })? + // Recurse into the new input. this is similar to top-down optimizer rule's + // logic. + .transform_data(|(new_window_expr_list, new_input, window_expr_list)| { + self.rewrite(new_input, config)?.map_data(|new_input| { + Ok((new_window_expr_list, new_input, window_expr_list)) }) - .collect::<Result<Vec<_>>>()?; - - assert_eq!(window_exprs.len(), arrays_per_window.len()); - let num_window_exprs = window_exprs.len(); - let rewritten_window_exprs = self.rewrite_expr( - // Must clone as Identifiers use references to original expressions so we - // have to keep the original expressions intact. - window_exprs.clone(), - arrays_per_window, - plan, - &expr_stats, - config, - )?; - let transformed = rewritten_window_exprs.transformed; - assert!(transformed); - - let (mut new_expr, new_input) = rewritten_window_exprs.data; - - let mut plan = new_input; - - // Construct consecutive window operator, with their corresponding new - // window expressions. - // - // Note this iterates over, `new_expr` and `saved_names` which are the - // same length, in reverse order - assert_eq!(num_window_exprs, new_expr.len()); - assert_eq!(num_window_exprs, saved_names.len()); - while let (Some(new_window_expr), Some(saved_names)) = - (new_expr.pop(), saved_names.pop()) - { - assert_eq!(new_window_expr.len(), saved_names.len()); - - // Rename re-written window expressions with original name, to - // preserve the output schema - let new_window_expr = new_window_expr - .into_iter() - .zip(saved_names.into_iter()) - .map(|(new_window_expr, saved_name)| { - saved_name.restore(new_window_expr) - }) - .collect::<Result<Vec<_>>>()?; - plan = LogicalPlan::Window(Window::try_new( - new_window_expr, - Arc::new(plan), - )?); - } - - Ok(Transformed::new_transformed(plan, transformed)) - } else { - while let (Some(window_expr), Some(schema)) = - (window_exprs.pop(), window_schemas.pop()) - { - plan = LogicalPlan::Window(Window { - input: Arc::new(plan), - window_expr, - schema, - }); - } - - Ok(Transformed::no(plan)) - } + })? + // Rebuild the consecutive window nodes. + .map_data(|(new_window_expr_list, new_input, window_expr_list)| { + // If there were common expressions extracted, then we need to make sure + // we restore the original column names. + // TODO: Although `find_common_exprs()` inserts aliases around extracted Review Comment: I wanted to fix the previous `TODO` (https://github.com/apache/datafusion/pull/11683/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L563) but realized that preserving names is still required, so I added 2 `TODO`s where we have that logic. I will try to get rid of them in a follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org