jayzhan211 commented on code in PR #10066:
URL: 
https://github.com/apache/arrow-datafusion/pull/10066#discussion_r1564359740


##########
datafusion/optimizer/src/analyzer/count_wildcard_rule.rs:
##########
@@ -47,181 +38,43 @@ impl CountWildcardRule {
 
 impl AnalyzerRule for CountWildcardRule {
     fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_down(&analyze_internal).data()
+        plan.transform_down_with_subqueries(&analyze_internal)
+            .data()
     }
 
     fn name(&self) -> &str {
         "count_wildcard_rule"
     }
 }
 
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
-    let mut rewriter = CountWildcardRewriter {};
-    match plan {
-        LogicalPlan::Window(window) => {
-            let window_expr = window
-                .window_expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-
-            Ok(Transformed::yes(
-                LogicalPlanBuilder::from((*window.input).clone())
-                    .window(window_expr)?
-                    .build()?,
-            ))
-        }
-        LogicalPlan::Aggregate(agg) => {
-            let aggr_expr = agg
-                .aggr_expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-
-            Ok(Transformed::yes(LogicalPlan::Aggregate(
-                Aggregate::try_new(agg.input.clone(), agg.group_expr, 
aggr_expr)?,
-            )))
-        }
-        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
-            let sort_expr = expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-            Ok(Transformed::yes(LogicalPlan::Sort(Sort {
-                expr: sort_expr,
-                input,
-                fetch,
-            })))
-        }
-        LogicalPlan::Projection(projection) => {
-            let projection_expr = projection
-                .expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-            Ok(Transformed::yes(LogicalPlan::Projection(
-                Projection::try_new(projection_expr, projection.input)?,
-            )))
-        }
-        LogicalPlan::Filter(Filter {
-            predicate, input, ..
-        }) => {
-            let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
-            Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
-                predicate, input,
-            )?)))
-        }
-
-        _ => Ok(Transformed::no(plan)),
-    }
+fn is_wildcard(expr: &Expr) -> bool {
+    matches!(expr, Expr::Wildcard { qualifier: None })
 }
-
-struct CountWildcardRewriter {}
-
-impl TreeNodeRewriter for CountWildcardRewriter {
-    type Node = Expr;
-
-    fn f_up(&mut self, old_expr: Expr) -> Result<Transformed<Expr>> {
-        Ok(match old_expr.clone() {
-            Expr::WindowFunction(expr::WindowFunction {
-                fun:
-                    expr::WindowFunctionDefinition::AggregateFunction(
-                        aggregate_function::AggregateFunction::Count,
-                    ),
-                args,
-                partition_by,
-                order_by,
-                window_frame,
-                null_treatment,
-            }) if args.len() == 1 => match args[0] {
-                Expr::Wildcard { qualifier: None } => {
-                    Transformed::yes(Expr::WindowFunction(expr::WindowFunction 
{
-                        fun: expr::WindowFunctionDefinition::AggregateFunction(
-                            aggregate_function::AggregateFunction::Count,
-                        ),
-                        args: vec![lit(COUNT_STAR_EXPANSION)],
-                        partition_by,
-                        order_by,
-                        window_frame,
-                        null_treatment,
-                    }))
-                }
-
-                _ => Transformed::no(old_expr),
-            },
-            Expr::AggregateFunction(AggregateFunction {
-                func_def:
-                    AggregateFunctionDefinition::BuiltIn(
-                        aggregate_function::AggregateFunction::Count,
-                    ),
-                args,
-                distinct,
-                filter,
-                order_by,
-                null_treatment,
-            }) if args.len() == 1 => match args[0] {
-                Expr::Wildcard { qualifier: None } => {
-                    
Transformed::yes(Expr::AggregateFunction(AggregateFunction::new(
-                        aggregate_function::AggregateFunction::Count,
-                        vec![lit(COUNT_STAR_EXPANSION)],
-                        distinct,
-                        filter,
-                        order_by,
-                        null_treatment,
-                    )))
-                }
-                _ => Transformed::no(old_expr),
-            },
-
-            ScalarSubquery(Subquery {
-                subquery,
-                outer_ref_columns,
-            }) => subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    ScalarSubquery(Subquery {
-                        subquery: Arc::new(new_plan),
-                        outer_ref_columns,
-                    })
-                }),
-            Expr::InSubquery(InSubquery {
-                expr,
-                subquery,
-                negated,
-            }) => subquery
-                .subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    Expr::InSubquery(InSubquery::new(
-                        expr,
-                        Subquery {
-                            subquery: Arc::new(new_plan),
-                            outer_ref_columns: subquery.outer_ref_columns,
-                        },
-                        negated,
-                    ))
-                }),
-            Expr::Exists(expr::Exists { subquery, negated }) => subquery
-                .subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    Expr::Exists(expr::Exists {
-                        subquery: Subquery {
-                            subquery: Arc::new(new_plan),
-                            outer_ref_columns: subquery.outer_ref_columns,
-                        },
-                        negated,
-                    })
-                }),
-            _ => Transformed::no(old_expr),
-        })
-    }
+fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+    let name_preserver = NamePreserver::new(&plan);
+    plan.map_expressions(|expr| {
+        let original_name = name_preserver.save(&expr)?;
+        let transformed_expr = expr.transform_up(&|expr| match expr {
+            Expr::WindowFunction(mut window_function)
+                if window_function.args.len() == 1
+                    && is_wildcard(&window_function.args[0]) =>

Review Comment:
   do we need to match function Count as well?



##########
datafusion/optimizer/src/analyzer/count_wildcard_rule.rs:
##########
@@ -47,181 +38,43 @@ impl CountWildcardRule {
 
 impl AnalyzerRule for CountWildcardRule {
     fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_down(&analyze_internal).data()
+        plan.transform_down_with_subqueries(&analyze_internal)
+            .data()
     }
 
     fn name(&self) -> &str {
         "count_wildcard_rule"
     }
 }
 
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
-    let mut rewriter = CountWildcardRewriter {};
-    match plan {
-        LogicalPlan::Window(window) => {
-            let window_expr = window
-                .window_expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-
-            Ok(Transformed::yes(
-                LogicalPlanBuilder::from((*window.input).clone())
-                    .window(window_expr)?
-                    .build()?,
-            ))
-        }
-        LogicalPlan::Aggregate(agg) => {
-            let aggr_expr = agg
-                .aggr_expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-
-            Ok(Transformed::yes(LogicalPlan::Aggregate(
-                Aggregate::try_new(agg.input.clone(), agg.group_expr, 
aggr_expr)?,
-            )))
-        }
-        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
-            let sort_expr = expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-            Ok(Transformed::yes(LogicalPlan::Sort(Sort {
-                expr: sort_expr,
-                input,
-                fetch,
-            })))
-        }
-        LogicalPlan::Projection(projection) => {
-            let projection_expr = projection
-                .expr
-                .iter()
-                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
-                .collect::<Result<Vec<_>>>()?;
-            Ok(Transformed::yes(LogicalPlan::Projection(
-                Projection::try_new(projection_expr, projection.input)?,
-            )))
-        }
-        LogicalPlan::Filter(Filter {
-            predicate, input, ..
-        }) => {
-            let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
-            Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
-                predicate, input,
-            )?)))
-        }
-
-        _ => Ok(Transformed::no(plan)),
-    }
+fn is_wildcard(expr: &Expr) -> bool {
+    matches!(expr, Expr::Wildcard { qualifier: None })
 }
-
-struct CountWildcardRewriter {}
-
-impl TreeNodeRewriter for CountWildcardRewriter {
-    type Node = Expr;
-
-    fn f_up(&mut self, old_expr: Expr) -> Result<Transformed<Expr>> {
-        Ok(match old_expr.clone() {
-            Expr::WindowFunction(expr::WindowFunction {
-                fun:
-                    expr::WindowFunctionDefinition::AggregateFunction(
-                        aggregate_function::AggregateFunction::Count,
-                    ),
-                args,
-                partition_by,
-                order_by,
-                window_frame,
-                null_treatment,
-            }) if args.len() == 1 => match args[0] {
-                Expr::Wildcard { qualifier: None } => {
-                    Transformed::yes(Expr::WindowFunction(expr::WindowFunction 
{
-                        fun: expr::WindowFunctionDefinition::AggregateFunction(
-                            aggregate_function::AggregateFunction::Count,
-                        ),
-                        args: vec![lit(COUNT_STAR_EXPANSION)],
-                        partition_by,
-                        order_by,
-                        window_frame,
-                        null_treatment,
-                    }))
-                }
-
-                _ => Transformed::no(old_expr),
-            },
-            Expr::AggregateFunction(AggregateFunction {
-                func_def:
-                    AggregateFunctionDefinition::BuiltIn(
-                        aggregate_function::AggregateFunction::Count,
-                    ),
-                args,
-                distinct,
-                filter,
-                order_by,
-                null_treatment,
-            }) if args.len() == 1 => match args[0] {
-                Expr::Wildcard { qualifier: None } => {
-                    
Transformed::yes(Expr::AggregateFunction(AggregateFunction::new(
-                        aggregate_function::AggregateFunction::Count,
-                        vec![lit(COUNT_STAR_EXPANSION)],
-                        distinct,
-                        filter,
-                        order_by,
-                        null_treatment,
-                    )))
-                }
-                _ => Transformed::no(old_expr),
-            },
-
-            ScalarSubquery(Subquery {
-                subquery,
-                outer_ref_columns,
-            }) => subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    ScalarSubquery(Subquery {
-                        subquery: Arc::new(new_plan),
-                        outer_ref_columns,
-                    })
-                }),
-            Expr::InSubquery(InSubquery {
-                expr,
-                subquery,
-                negated,
-            }) => subquery
-                .subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    Expr::InSubquery(InSubquery::new(
-                        expr,
-                        Subquery {
-                            subquery: Arc::new(new_plan),
-                            outer_ref_columns: subquery.outer_ref_columns,
-                        },
-                        negated,
-                    ))
-                }),
-            Expr::Exists(expr::Exists { subquery, negated }) => subquery
-                .subquery
-                .as_ref()
-                .clone()
-                .transform_down(&analyze_internal)?
-                .update_data(|new_plan| {
-                    Expr::Exists(expr::Exists {
-                        subquery: Subquery {
-                            subquery: Arc::new(new_plan),
-                            outer_ref_columns: subquery.outer_ref_columns,
-                        },
-                        negated,
-                    })
-                }),
-            _ => Transformed::no(old_expr),
-        })
-    }
+fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+    let name_preserver = NamePreserver::new(&plan);
+    plan.map_expressions(|expr| {
+        let original_name = name_preserver.save(&expr)?;
+        let transformed_expr = expr.transform_up(&|expr| match expr {
+            Expr::WindowFunction(mut window_function)
+                if window_function.args.len() == 1
+                    && is_wildcard(&window_function.args[0]) =>

Review Comment:
   do we need to match the function Count as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to