peter-toth commented on code in PR #11683:
URL: https://github.com/apache/datafusion/pull/11683#discussion_r1693953502


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -353,96 +349,81 @@ impl CommonSubexprEliminate {
         window: Window,
         config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        // collect all window expressions from any number of LogicalPlanWindow
-        let (mut window_exprs, mut window_schemas, mut plan) =
+        // Collects window expressions from consecutive `LogicalPlan::Window` 
nodes into
+        // a list.
+        let (window_expr_list, window_schemas, input) =
             get_consecutive_window_exprs(window);
 
-        let mut found_common = false;
-        let mut expr_stats = ExprStats::new();
-        let arrays_per_window = window_exprs
-            .iter()
-            .map(|window_expr| {
-                self.to_arrays(window_expr, &mut expr_stats, ExprMask::Normal)
-                    .map(|(fc, id_arrays)| {
-                        found_common |= fc;
-
-                        id_arrays
+        // Extract common sub-expressions from the list.
+        self.find_common_exprs(window_expr_list, config, ExprMask::Normal)?
+            .map_data(|(new_window_expr_list, common)| match common {
+                // If there are common sub-expressions, then the insert a 
projection node
+                // with the common expressions between the new window nodes 
and the
+                // original input.
+                Some((common_exprs, window_expr_list)) => {
+                    build_common_expr_project_plan(input, 
common_exprs).map(|new_input| {
+                        (new_window_expr_list, new_input, 
Some(window_expr_list))
                     })
-            })
-            .collect::<Result<Vec<_>>>()?;
+                }
 
-        if found_common {
-            // save the original names
-            let name_preserver = NamePreserver::new(&plan);
-            let mut saved_names = window_exprs
-                .iter()
-                .map(|exprs| {
-                    exprs
-                        .iter()
-                        .map(|expr| name_preserver.save(expr))
-                        .collect::<Result<Vec<_>>>()
+                None => Ok((new_window_expr_list, input, None)),
+            })?
+            // Recurse into the new input. this is similar to top-down 
optimizer rule's
+            // logic.
+            .transform_data(|(new_window_expr_list, new_input, 
window_expr_list)| {
+                self.rewrite(new_input, config)?.map_data(|new_input| {
+                    Ok((new_window_expr_list, new_input, window_expr_list))
                 })
-                .collect::<Result<Vec<_>>>()?;
-
-            assert_eq!(window_exprs.len(), arrays_per_window.len());
-            let num_window_exprs = window_exprs.len();
-            let rewritten_window_exprs = self.rewrite_expr(
-                // Must clone as Identifiers use references to original 
expressions so we
-                // have to keep the original expressions intact.
-                window_exprs.clone(),
-                arrays_per_window,
-                plan,
-                &expr_stats,
-                config,
-            )?;
-            let transformed = rewritten_window_exprs.transformed;
-            assert!(transformed);
-
-            let (mut new_expr, new_input) = rewritten_window_exprs.data;
-
-            let mut plan = new_input;
-
-            // Construct consecutive window operator, with their corresponding 
new
-            // window expressions.
-            //
-            // Note this iterates over, `new_expr` and `saved_names` which are 
the
-            // same length, in reverse order
-            assert_eq!(num_window_exprs, new_expr.len());
-            assert_eq!(num_window_exprs, saved_names.len());
-            while let (Some(new_window_expr), Some(saved_names)) =
-                (new_expr.pop(), saved_names.pop())
-            {
-                assert_eq!(new_window_expr.len(), saved_names.len());
-
-                // Rename re-written window expressions with original name, to
-                // preserve the output schema
-                let new_window_expr = new_window_expr
-                    .into_iter()
-                    .zip(saved_names.into_iter())
-                    .map(|(new_window_expr, saved_name)| {
-                        saved_name.restore(new_window_expr)
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                plan = LogicalPlan::Window(Window::try_new(
-                    new_window_expr,
-                    Arc::new(plan),
-                )?);
-            }
-
-            Ok(Transformed::new_transformed(plan, transformed))
-        } else {
-            while let (Some(window_expr), Some(schema)) =
-                (window_exprs.pop(), window_schemas.pop())
-            {
-                plan = LogicalPlan::Window(Window {
-                    input: Arc::new(plan),
-                    window_expr,
-                    schema,
-                });
-            }
-
-            Ok(Transformed::no(plan))
-        }
+            })?
+            // Rebuild the consecutive window nodes.
+            .map_data(|(new_window_expr_list, new_input, window_expr_list)| {
+                // If there were common expressions extracted, then we need to 
make sure
+                // we restore the original column names.
+                // TODO: Although `find_common_exprs()` inserts aliases around 
extracted

Review Comment:
   I wanted to fix the previous `TODO` 
(https://github.com/apache/datafusion/pull/11683/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L563)
 but realized that preserving names is still required, so I added 2 `TODO`s 
where we have that logic.
   I will try to get rid of them in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to