This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e03f9f6767 Remove CountWildcardRule in Analyzer and move the 
functionality in ExprPlanner, add `plan_aggregate` and `plan_window` to planner 
(#14689)
e03f9f6767 is described below

commit e03f9f6767a69f16c5a96d5d3863cefc209497e2
Author: Jay Zhan <[email protected]>
AuthorDate: Sat Feb 22 07:45:46 2025 +0800

    Remove CountWildcardRule in Analyzer and move the functionality in 
ExprPlanner, add `plan_aggregate` and `plan_window` to planner (#14689)
    
    * count planner
    
    * window
    
    * update slt
    
    * remove rule
    
    * rm rule
    
    * doc
    
    * fix name
    
    * fix name
    
    * fix test
    
    * tpch test
    
    * fix avro
    
    * rename
    
    * switch to count(*)
    
    * use count(*)
    
    * rename
    
    * doc
    
    * rename window funciotn
    
    * fmt
    
    * rm print
    
    * upd logic
    
    * count null
---
 .../core/src/execution/session_state_defaults.rs   |   2 +
 .../core/tests/dataframe/dataframe_functions.rs    |  34 ++-
 datafusion/core/tests/dataframe/mod.rs             |  30 +--
 datafusion/core/tests/sql/explain_analyze.rs       |   2 +-
 datafusion/expr/src/expr.rs                        |   1 -
 datafusion/expr/src/planner.rs                     |  62 ++++-
 datafusion/expr/src/udaf.rs                        |  21 +-
 datafusion/functions-aggregate/src/count.rs        | 201 ++++++++++++++-
 datafusion/functions-aggregate/src/lib.rs          |  19 +-
 datafusion/functions-aggregate/src/planner.rs      |  63 +++++
 datafusion/functions-window/src/lib.rs             |   3 +
 datafusion/functions-window/src/planner.rs         |  61 +++++
 .../optimizer/src/analyzer/count_wildcard_rule.rs  | 277 ---------------------
 datafusion/optimizer/src/analyzer/mod.rs           |   3 -
 .../optimizer/tests/optimizer_integration.rs       |  28 ++-
 datafusion/sql/src/expr/function.rs                |  70 +++++-
 datafusion/sql/tests/sql_integration.rs            |  12 +-
 datafusion/sqllogictest/test_files/aggregate.slt   |  20 ++
 datafusion/sqllogictest/test_files/avro.slt        |   2 +-
 datafusion/sqllogictest/test_files/coalesce.slt    |   2 +-
 datafusion/sqllogictest/test_files/copy.slt        |   1 -
 .../sqllogictest/test_files/count_star_rule.slt    |  32 +--
 datafusion/sqllogictest/test_files/ddl.slt         |   1 -
 datafusion/sqllogictest/test_files/errors.slt      |   2 +-
 datafusion/sqllogictest/test_files/explain.slt     |   3 +-
 datafusion/sqllogictest/test_files/insert.slt      |   6 +-
 .../sqllogictest/test_files/insert_to_external.slt |   4 +-
 datafusion/sqllogictest/test_files/joins.slt       |   4 +-
 datafusion/sqllogictest/test_files/json.slt        |   2 +-
 datafusion/sqllogictest/test_files/limit.slt       |   8 +-
 .../test_files/optimizer_group_by_constant.slt     |  16 +-
 datafusion/sqllogictest/test_files/select.slt      |   2 +-
 datafusion/sqllogictest/test_files/subquery.slt    |  42 ++--
 .../sqllogictest/test_files/tpch/plans/q1.slt.part |   2 +-
 .../test_files/tpch/plans/q13.slt.part             |   2 +-
 .../test_files/tpch/plans/q21.slt.part             |   2 +-
 .../test_files/tpch/plans/q22.slt.part             |   2 +-
 .../sqllogictest/test_files/tpch/plans/q4.slt.part |   2 +-
 datafusion/sqllogictest/test_files/union.slt       |   6 +-
 datafusion/sqllogictest/test_files/window.slt      |  12 +-
 .../substrait/tests/cases/consumer_integration.rs  |  26 +-
 .../tests/cases/roundtrip_logical_plan.rs          |   4 +-
 42 files changed, 652 insertions(+), 442 deletions(-)

diff --git a/datafusion/core/src/execution/session_state_defaults.rs 
b/datafusion/core/src/execution/session_state_defaults.rs
index 92f649781c..33bf01cf35 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -94,6 +94,8 @@ impl SessionStateDefaults {
                 feature = "unicode_expressions"
             ))]
             Arc::new(functions::planner::UserDefinedFunctionPlanner),
+            Arc::new(functions_aggregate::planner::AggregateFunctionPlanner),
+            Arc::new(functions_window::planner::WindowFunctionPlanner),
         ];
 
         expr_planners
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs 
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 29c24948fb..33f32e8f0f 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -22,6 +22,7 @@ use arrow::{
     array::{Int32Array, StringArray},
     record_batch::RecordBatch,
 };
+use datafusion_functions_aggregate::count::count_all;
 use std::sync::Arc;
 
 use datafusion::error::Result;
@@ -31,7 +32,7 @@ use datafusion::prelude::*;
 use datafusion::assert_batches_eq;
 use datafusion_common::{DFSchema, ScalarValue};
 use datafusion_expr::expr::Alias;
-use datafusion_expr::ExprSchemable;
+use datafusion_expr::{table_scan, ExprSchemable, LogicalPlanBuilder};
 use datafusion_functions_aggregate::expr_fn::{approx_median, 
approx_percentile_cont};
 use datafusion_functions_nested::map::map;
 
@@ -1123,3 +1124,34 @@ async fn test_fn_map() -> Result<()> {
 
     Ok(())
 }
+
+/// Call count wildcard from dataframe API
+#[tokio::test]
+async fn test_count_wildcard() -> Result<()> {
+    let schema = Schema::new(vec![
+        Field::new("a", DataType::UInt32, false),
+        Field::new("b", DataType::UInt32, false),
+        Field::new("c", DataType::UInt32, false),
+    ]);
+
+    let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
+    let plan = LogicalPlanBuilder::from(table_scan)
+        .aggregate(vec![col("b")], vec![count_all()])
+        .unwrap()
+        .project(vec![count_all()])
+        .unwrap()
+        .sort(vec![count_all().sort(true, false)])
+        .unwrap()
+        .build()
+        .unwrap();
+
+    let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\
+    \n  Projection: count(*) [count(*):Int64]\
+    \n    Aggregate: groupBy=[[test.b]], aggr=[[count(*)]] [b:UInt32, 
count(*):Int64]\
+    \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+    let formatted_plan = plan.display_indent_schema().to_string();
+    assert_eq!(formatted_plan, expected);
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index d545157607..b05029e8e3 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -32,7 +32,8 @@ use arrow::datatypes::{
 };
 use arrow::error::ArrowError;
 use arrow::util::pretty::pretty_format_batches;
-use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_expr::utils::COUNT_STAR_EXPANSION;
+use datafusion_functions_aggregate::count::{count_all, count_udaf};
 use datafusion_functions_aggregate::expr_fn::{
     array_agg, avg, count, count_distinct, max, median, min, sum,
 };
@@ -72,7 +73,7 @@ use datafusion_expr::expr::{GroupingSet, Sort, 
WindowFunction};
 use datafusion_expr::var_provider::{VarProvider, VarType};
 use datafusion_expr::{
     cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
-    scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, 
LogicalPlan,
+    scalar_subquery, when, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan,
     ScalarFunctionImplementation, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
     WindowFunctionDefinition,
 };
@@ -2463,8 +2464,8 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
     let df_results = ctx
         .table("t1")
         .await?
-        .aggregate(vec![col("b")], vec![count(wildcard())])?
-        .sort(vec![count(wildcard()).sort(true, false)])?
+        .aggregate(vec![col("b")], vec![count_all()])?
+        .sort(vec![count_all().sort(true, false)])?
         .explain(false, false)?
         .collect()
         .await?;
@@ -2498,8 +2499,8 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
             Arc::new(
                 ctx.table("t2")
                     .await?
-                    .aggregate(vec![], vec![count(wildcard())])?
-                    .select(vec![count(wildcard())])?
+                    .aggregate(vec![], vec![count_all()])?
+                    .select(vec![count_all()])?
                     .into_optimized_plan()?,
             ),
         ))?
@@ -2532,8 +2533,8 @@ async fn test_count_wildcard_on_where_exist() -> 
Result<()> {
         .filter(exists(Arc::new(
             ctx.table("t2")
                 .await?
-                .aggregate(vec![], vec![count(wildcard())])?
-                .select(vec![count(wildcard())])?
+                .aggregate(vec![], vec![count_all()])?
+                .select(vec![count_all()])?
                 .into_unoptimized_plan(),
             // Usually, into_optimized_plan() should be used here, but due to
             // https://github.com/apache/datafusion/issues/5771,
@@ -2568,7 +2569,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
         .await?
         .select(vec![Expr::WindowFunction(WindowFunction::new(
             WindowFunctionDefinition::AggregateUDF(count_udaf()),
-            vec![wildcard()],
+            vec![Expr::Literal(COUNT_STAR_EXPANSION)],
         ))
         .order_by(vec![Sort::new(col("a"), false, true)])
         .window_frame(WindowFrame::new_bounds(
@@ -2599,17 +2600,16 @@ async fn test_count_wildcard_on_aggregate() -> 
Result<()> {
     let sql_results = ctx
         .sql("select count(*) from t1")
         .await?
-        .select(vec![col("count(*)")])?
         .explain(false, false)?
         .collect()
         .await?;
 
-    // add `.select(vec![count(wildcard())])?` to make sure we can analyze all 
node instead of just top node.
+    // add `.select(vec![count_wildcard()])?` to make sure we can analyze all 
node instead of just top node.
     let df_results = ctx
         .table("t1")
         .await?
-        .aggregate(vec![], vec![count(wildcard())])?
-        .select(vec![count(wildcard())])?
+        .aggregate(vec![], vec![count_all()])?
+        .select(vec![count_all()])?
         .explain(false, false)?
         .collect()
         .await?;
@@ -2646,8 +2646,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
                 ctx.table("t2")
                     .await?
                     .filter(out_ref_col(DataType::UInt32, 
"t1.a").eq(col("t2.a")))?
-                    .aggregate(vec![], vec![count(wildcard())])?
-                    .select(vec![col(count(wildcard()).to_string())])?
+                    .aggregate(vec![], vec![count_all()])?
+                    .select(vec![col(count_all().to_string())])?
                     .into_unoptimized_plan(),
             ))
             .gt(lit(ScalarValue::UInt8(Some(0)))),
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index d4b5ae8b28..128d1d0aa4 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -780,7 +780,7 @@ async fn explain_logical_plan_only() {
     let expected = vec![
         vec![
             "logical_plan",
-            "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\
+            "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
             \n  SubqueryAlias: t\
             \n    Projection: \
             \n      Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), 
Int64(2), Int64(150))"
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index df79b3568c..f8baf9c94b 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2294,7 +2294,6 @@ impl Display for SchemaDisplay<'_> {
             | Expr::OuterReferenceColumn(..)
             | Expr::Placeholder(_)
             | Expr::Wildcard { .. } => write!(f, "{}", self.0),
-
             Expr::AggregateFunction(AggregateFunction { func, params }) => {
                 match func.schema_name(params) {
                     Ok(name) => {
diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs
index 04cc26c910..a2ed0592ef 100644
--- a/datafusion/expr/src/planner.rs
+++ b/datafusion/expr/src/planner.rs
@@ -25,9 +25,12 @@ use datafusion_common::{
     config::ConfigOptions, file_options::file_type::FileType, not_impl_err, 
DFSchema,
     Result, TableReference,
 };
-use sqlparser::ast;
+use sqlparser::ast::{self, NullTreatment};
 
-use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource, 
WindowUDF};
+use crate::{
+    AggregateUDF, Expr, GetFieldAccess, ScalarUDF, SortExpr, TableSource, 
WindowFrame,
+    WindowFunctionDefinition, WindowUDF,
+};
 
 /// Provides the `SQL` query planner meta-data about tables and
 /// functions referenced in SQL statements, without a direct dependency on the
@@ -138,7 +141,7 @@ pub trait ExprPlanner: Debug + Send + Sync {
 
     /// Plan an array literal, such as `[1, 2, 3]`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_array_literal(
         &self,
         exprs: Vec<Expr>,
@@ -149,14 +152,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
 
     /// Plan a `POSITION` expression, such as `POSITION(<expr> in <expr>)`
     ///
-    /// returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_position(&self, args: Vec<Expr>) -> 
Result<PlannerResult<Vec<Expr>>> {
         Ok(PlannerResult::Original(args))
     }
 
     /// Plan a dictionary literal, such as `{ key: value, ...}`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_dictionary_literal(
         &self,
         expr: RawDictionaryExpr,
@@ -167,14 +170,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
 
     /// Plan an extract expression, such as`EXTRACT(month FROM foo)`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_extract(&self, args: Vec<Expr>) -> 
Result<PlannerResult<Vec<Expr>>> {
         Ok(PlannerResult::Original(args))
     }
 
     /// Plan an substring expression, such as `SUBSTRING(<expr> [FROM <expr>] 
[FOR <expr>])`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_substring(&self, args: Vec<Expr>) -> 
Result<PlannerResult<Vec<Expr>>> {
         Ok(PlannerResult::Original(args))
     }
@@ -195,14 +198,14 @@ pub trait ExprPlanner: Debug + Send + Sync {
 
     /// Plans an overlay expression, such as `overlay(str PLACING substr FROM 
pos [FOR count])`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_overlay(&self, args: Vec<Expr>) -> 
Result<PlannerResult<Vec<Expr>>> {
         Ok(PlannerResult::Original(args))
     }
 
     /// Plans a `make_map` expression, such as `make_map(key1, value1, key2, 
value2, ...)`
     ///
-    /// Returns origin expression arguments if not possible
+    /// Returns original expression arguments if not possible
     fn plan_make_map(&self, args: Vec<Expr>) -> 
Result<PlannerResult<Vec<Expr>>> {
         Ok(PlannerResult::Original(args))
     }
@@ -230,6 +233,23 @@ pub trait ExprPlanner: Debug + Send + Sync {
     fn plan_any(&self, expr: RawBinaryExpr) -> 
Result<PlannerResult<RawBinaryExpr>> {
         Ok(PlannerResult::Original(expr))
     }
+
+    /// Plans aggregate functions, such as `COUNT(<expr>)`
+    ///
+    /// Returns original expression arguments if not possible
+    fn plan_aggregate(
+        &self,
+        expr: RawAggregateExpr,
+    ) -> Result<PlannerResult<RawAggregateExpr>> {
+        Ok(PlannerResult::Original(expr))
+    }
+
+    /// Plans window functions, such as `COUNT(<expr>)`
+    ///
+    /// Returns original expression arguments if not possible
+    fn plan_window(&self, expr: RawWindowExpr) -> 
Result<PlannerResult<RawWindowExpr>> {
+        Ok(PlannerResult::Original(expr))
+    }
 }
 
 /// An operator with two arguments to plan
@@ -266,6 +286,30 @@ pub struct RawDictionaryExpr {
     pub values: Vec<Expr>,
 }
 
+/// This structure is used by `AggregateFunctionPlanner` to plan operators with
+/// custom expressions.
+#[derive(Debug, Clone)]
+pub struct RawAggregateExpr {
+    pub func: Arc<AggregateUDF>,
+    pub args: Vec<Expr>,
+    pub distinct: bool,
+    pub filter: Option<Box<Expr>>,
+    pub order_by: Option<Vec<SortExpr>>,
+    pub null_treatment: Option<NullTreatment>,
+}
+
+/// This structure is used by `WindowFunctionPlanner` to plan operators with
+/// custom expressions.
+#[derive(Debug, Clone)]
+pub struct RawWindowExpr {
+    pub func_def: WindowFunctionDefinition,
+    pub args: Vec<Expr>,
+    pub partition_by: Vec<Expr>,
+    pub order_by: Vec<SortExpr>,
+    pub window_frame: WindowFrame,
+    pub null_treatment: Option<NullTreatment>,
+}
+
 /// Result of planning a raw expr with [`ExprPlanner`]
 #[derive(Debug, Clone)]
 pub enum PlannerResult<T> {
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index 2b9e2bddd1..ae7196c9b1 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -515,9 +515,9 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
             null_treatment,
         } = params;
 
-        let mut schema_name = String::new();
+        let mut display_name = String::new();
 
-        schema_name.write_fmt(format_args!(
+        display_name.write_fmt(format_args!(
             "{}({}{})",
             self.name(),
             if *distinct { "DISTINCT " } else { "" },
@@ -525,17 +525,22 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
         ))?;
 
         if let Some(nt) = null_treatment {
-            schema_name.write_fmt(format_args!(" {}", nt))?;
+            display_name.write_fmt(format_args!(" {}", nt))?;
         }
         if let Some(fe) = filter {
-            schema_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
+            display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
         }
-        if let Some(order_by) = order_by {
-            schema_name
-                .write_fmt(format_args!(" ORDER BY [{}]", 
expr_vec_fmt!(order_by)))?;
+        if let Some(ob) = order_by {
+            display_name.write_fmt(format_args!(
+                " ORDER BY [{}]",
+                ob.iter()
+                    .map(|o| format!("{o}"))
+                    .collect::<Vec<String>>()
+                    .join(", ")
+            ))?;
         }
 
-        Ok(schema_name)
+        Ok(display_name)
     }
 
     /// Returns the user-defined display name of function, given the arguments
diff --git a/datafusion/functions-aggregate/src/count.rs 
b/datafusion/functions-aggregate/src/count.rs
index cb59042ef4..c11329d7f5 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -17,11 +17,15 @@
 
 use ahash::RandomState;
 use datafusion_common::stats::Precision;
+use datafusion_expr::expr::{
+    schema_name_from_exprs, schema_name_from_sorts, AggregateFunctionParams,
+    WindowFunctionParams,
+};
 use 
datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
 use datafusion_macros::user_doc;
 use datafusion_physical_expr::expressions;
 use std::collections::HashSet;
-use std::fmt::Debug;
+use std::fmt::{Debug, Write};
 use std::mem::{size_of, size_of_val};
 use std::ops::BitAnd;
 use std::sync::Arc;
@@ -47,11 +51,11 @@ use datafusion_common::{
     downcast_value, internal_err, not_impl_err, Result, ScalarValue,
 };
 use datafusion_expr::function::StateFieldsArgs;
+use datafusion_expr::{expr_vec_fmt, Expr, ReversedUDAF, StatisticsArgs, 
TypeSignature};
 use datafusion_expr::{
     function::AccumulatorArgs, utils::format_state_name, Accumulator, 
AggregateUDFImpl,
     Documentation, EmitTo, GroupsAccumulator, SetMonotonicity, Signature, 
Volatility,
 };
-use datafusion_expr::{Expr, ReversedUDAF, StatisticsArgs, TypeSignature};
 use datafusion_functions_aggregate_common::aggregate::count_distinct::{
     BytesDistinctCountAccumulator, FloatDistinctCountAccumulator,
     PrimitiveDistinctCountAccumulator,
@@ -79,6 +83,11 @@ pub fn count_distinct(expr: Expr) -> Expr {
     ))
 }
 
+/// Creates aggregation to count all rows, equivalent to `COUNT(*)`, 
`COUNT()`, `COUNT(1)`
+pub fn count_all() -> Expr {
+    count(Expr::Literal(COUNT_STAR_EXPANSION))
+}
+
 #[user_doc(
     doc_section(label = "General Functions"),
     description = "Returns the number of non-null values in the specified 
column. To include null values in the total count, use `count(*)`.",
@@ -139,6 +148,185 @@ impl AggregateUDFImpl for Count {
         "count"
     }
 
+    fn schema_name(&self, params: &AggregateFunctionParams) -> Result<String> {
+        let AggregateFunctionParams {
+            args,
+            distinct,
+            filter,
+            order_by,
+            null_treatment,
+        } = params;
+
+        let mut schema_name = String::new();
+
+        if is_count_wildcard(args) {
+            schema_name.write_str("count(*)")?;
+        } else {
+            schema_name.write_fmt(format_args!(
+                "{}({}{})",
+                self.name(),
+                if *distinct { "DISTINCT " } else { "" },
+                schema_name_from_exprs(args)?
+            ))?;
+        }
+
+        if let Some(null_treatment) = null_treatment {
+            schema_name.write_fmt(format_args!(" {}", null_treatment))?;
+        }
+
+        if let Some(filter) = filter {
+            schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?;
+        };
+
+        if let Some(order_by) = order_by {
+            schema_name.write_fmt(format_args!(
+                " ORDER BY [{}]",
+                schema_name_from_sorts(order_by)?
+            ))?;
+        };
+
+        Ok(schema_name)
+    }
+
+    fn window_function_schema_name(
+        &self,
+        params: &WindowFunctionParams,
+    ) -> Result<String> {
+        let WindowFunctionParams {
+            args,
+            partition_by,
+            order_by,
+            window_frame,
+            null_treatment,
+        } = params;
+
+        let mut schema_name = String::new();
+
+        if is_count_wildcard(args) {
+            schema_name.write_str("count(*)")?;
+        } else {
+            schema_name.write_fmt(format_args!(
+                "{}({})",
+                self.name(),
+                schema_name_from_exprs(args)?
+            ))?;
+        }
+
+        if let Some(null_treatment) = null_treatment {
+            schema_name.write_fmt(format_args!(" {}", null_treatment))?;
+        }
+
+        if !partition_by.is_empty() {
+            schema_name.write_fmt(format_args!(
+                " PARTITION BY [{}]",
+                schema_name_from_exprs(partition_by)?
+            ))?;
+        }
+
+        if !order_by.is_empty() {
+            schema_name.write_fmt(format_args!(
+                " ORDER BY [{}]",
+                schema_name_from_sorts(order_by)?
+            ))?;
+        };
+
+        schema_name.write_fmt(format_args!(" {window_frame}"))?;
+
+        Ok(schema_name)
+    }
+
+    fn display_name(&self, params: &AggregateFunctionParams) -> Result<String> 
{
+        let AggregateFunctionParams {
+            args,
+            distinct,
+            filter,
+            order_by,
+            null_treatment,
+        } = params;
+
+        let mut display_name = String::new();
+
+        if is_count_wildcard(args) {
+            display_name.write_str("count(*)")?;
+        } else {
+            display_name.write_fmt(format_args!(
+                "{}({}{})",
+                self.name(),
+                if *distinct { "DISTINCT " } else { "" },
+                args.iter()
+                    .map(|arg| format!("{arg}"))
+                    .collect::<Vec<String>>()
+                    .join(", ")
+            ))?;
+        }
+
+        if let Some(nt) = null_treatment {
+            display_name.write_fmt(format_args!(" {}", nt))?;
+        }
+        if let Some(fe) = filter {
+            display_name.write_fmt(format_args!(" FILTER (WHERE {fe})"))?;
+        }
+        if let Some(ob) = order_by {
+            display_name.write_fmt(format_args!(
+                " ORDER BY [{}]",
+                ob.iter()
+                    .map(|o| format!("{o}"))
+                    .collect::<Vec<String>>()
+                    .join(", ")
+            ))?;
+        }
+
+        Ok(display_name)
+    }
+
+    fn window_function_display_name(
+        &self,
+        params: &WindowFunctionParams,
+    ) -> Result<String> {
+        let WindowFunctionParams {
+            args,
+            partition_by,
+            order_by,
+            window_frame,
+            null_treatment,
+        } = params;
+
+        let mut display_name = String::new();
+
+        if is_count_wildcard(args) {
+            display_name.write_str("count(*)")?;
+        } else {
+            display_name.write_fmt(format_args!(
+                "{}({})",
+                self.name(),
+                expr_vec_fmt!(args)
+            ))?;
+        }
+
+        if let Some(null_treatment) = null_treatment {
+            display_name.write_fmt(format_args!(" {}", null_treatment))?;
+        }
+
+        if !partition_by.is_empty() {
+            display_name.write_fmt(format_args!(
+                " PARTITION BY [{}]",
+                expr_vec_fmt!(partition_by)
+            ))?;
+        }
+
+        if !order_by.is_empty() {
+            display_name
+                .write_fmt(format_args!(" ORDER BY [{}]", 
expr_vec_fmt!(order_by)))?;
+        };
+
+        display_name.write_fmt(format_args!(
+            " {} BETWEEN {} AND {}",
+            window_frame.units, window_frame.start_bound, 
window_frame.end_bound
+        ))?;
+
+        Ok(display_name)
+    }
+
     fn signature(&self) -> &Signature {
         &self.signature
     }
@@ -359,6 +547,15 @@ impl AggregateUDFImpl for Count {
     }
 }
 
+fn is_count_wildcard(args: &[Expr]) -> bool {
+    match args {
+        [] => true, // count()
+        // All const should be coerced to int64 or rejected by the signature
+        [Expr::Literal(ScalarValue::Int64(Some(_)))] => true, // count(1)
+        _ => false, // More than one argument or non-matching cases
+    }
+}
+
 #[derive(Debug)]
 struct CountAccumulator {
     count: i64,
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index f4bdb53efd..a5c84298e9 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -64,28 +64,29 @@
 pub mod macros;
 
 pub mod approx_distinct;
+pub mod approx_median;
+pub mod approx_percentile_cont;
+pub mod approx_percentile_cont_with_weight;
 pub mod array_agg;
+pub mod average;
+pub mod bit_and_or_xor;
+pub mod bool_and_or;
 pub mod correlation;
 pub mod count;
 pub mod covariance;
 pub mod first_last;
+pub mod grouping;
 pub mod hyperloglog;
 pub mod median;
 pub mod min_max;
+pub mod nth_value;
 pub mod regr;
 pub mod stddev;
+pub mod string_agg;
 pub mod sum;
 pub mod variance;
 
-pub mod approx_median;
-pub mod approx_percentile_cont;
-pub mod approx_percentile_cont_with_weight;
-pub mod average;
-pub mod bit_and_or_xor;
-pub mod bool_and_or;
-pub mod grouping;
-pub mod nth_value;
-pub mod string_agg;
+pub mod planner;
 
 use crate::approx_percentile_cont::approx_percentile_cont_udaf;
 use 
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
diff --git a/datafusion/functions-aggregate/src/planner.rs 
b/datafusion/functions-aggregate/src/planner.rs
new file mode 100644
index 0000000000..1f0a42c4c7
--- /dev/null
+++ b/datafusion/functions-aggregate/src/planner.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SQL planning extensions like [`AggregateFunctionPlanner`]
+
+use datafusion_common::Result;
+use datafusion_expr::{
+    expr::AggregateFunction,
+    lit,
+    planner::{ExprPlanner, PlannerResult, RawAggregateExpr},
+    utils::COUNT_STAR_EXPANSION,
+    Expr,
+};
+
+#[derive(Debug)]
+pub struct AggregateFunctionPlanner;
+
+impl ExprPlanner for AggregateFunctionPlanner {
+    fn plan_aggregate(
+        &self,
+        expr: RawAggregateExpr,
+    ) -> Result<PlannerResult<RawAggregateExpr>> {
+        if expr.func.name() == "count"
+            && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard 
{ .. })
+                || expr.args.is_empty())
+        {
+            let RawAggregateExpr {
+                func,
+                args: _,
+                distinct,
+                filter,
+                order_by,
+                null_treatment,
+            } = expr;
+            return Ok(PlannerResult::Planned(Expr::AggregateFunction(
+                AggregateFunction::new_udf(
+                    func,
+                    vec![lit(COUNT_STAR_EXPANSION)],
+                    distinct,
+                    filter,
+                    order_by,
+                    null_treatment,
+                ),
+            )));
+        }
+
+        Ok(PlannerResult::Original(expr))
+    }
+}
diff --git a/datafusion/functions-window/src/lib.rs 
b/datafusion/functions-window/src/lib.rs
index 0d932bf847..718b0bf158 100644
--- a/datafusion/functions-window/src/lib.rs
+++ b/datafusion/functions-window/src/lib.rs
@@ -45,6 +45,9 @@ pub mod nth_value;
 pub mod ntile;
 pub mod rank;
 pub mod row_number;
+
+pub mod planner;
+
 mod utils;
 
 /// Fluent-style API for creating `Expr`s
diff --git a/datafusion/functions-window/src/planner.rs 
b/datafusion/functions-window/src/planner.rs
new file mode 100644
index 0000000000..8f48ca8b18
--- /dev/null
+++ b/datafusion/functions-window/src/planner.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SQL planning extensions like [`WindowFunctionPlanner`]
+
+use datafusion_common::Result;
+use datafusion_expr::{
+    expr::WindowFunction,
+    lit,
+    planner::{ExprPlanner, PlannerResult, RawWindowExpr},
+    utils::COUNT_STAR_EXPANSION,
+    Expr, ExprFunctionExt,
+};
+
+#[derive(Debug)]
+pub struct WindowFunctionPlanner;
+
+impl ExprPlanner for WindowFunctionPlanner {
+    fn plan_window(&self, expr: RawWindowExpr) -> 
Result<PlannerResult<RawWindowExpr>> {
+        if expr.func_def.name() == "count"
+            && (expr.args.len() == 1 && matches!(expr.args[0], Expr::Wildcard 
{ .. })
+                || expr.args.is_empty())
+        {
+            let RawWindowExpr {
+                func_def,
+                args: _,
+                partition_by,
+                order_by,
+                window_frame,
+                null_treatment,
+            } = expr;
+            return Ok(PlannerResult::Planned(
+                Expr::WindowFunction(WindowFunction::new(
+                    func_def,
+                    vec![lit(COUNT_STAR_EXPANSION)],
+                ))
+                .partition_by(partition_by)
+                .order_by(order_by)
+                .window_frame(window_frame)
+                .null_treatment(null_treatment)
+                .build()?,
+            ));
+        }
+
+        Ok(PlannerResult::Original(expr))
+    }
+}
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
deleted file mode 100644
index f517761b1e..0000000000
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ /dev/null
@@ -1,277 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::analyzer::AnalyzerRule;
-
-use crate::utils::NamePreserver;
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::Result;
-use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams, 
WindowFunction};
-use datafusion_expr::utils::COUNT_STAR_EXPANSION;
-use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition};
-
-/// Rewrite `Count(Expr::Wildcard)` to `Count(Expr::Literal)`.
-///
-/// Resolves issue: <https://github.com/apache/datafusion/issues/5473>
-#[derive(Default, Debug)]
-pub struct CountWildcardRule {}
-
-impl CountWildcardRule {
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl AnalyzerRule for CountWildcardRule {
-    fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_down_with_subqueries(analyze_internal).data()
-    }
-
-    fn name(&self) -> &str {
-        "count_wildcard_rule"
-    }
-}
-
-fn is_wildcard(expr: &Expr) -> bool {
-    matches!(expr, Expr::Wildcard { .. })
-}
-
-fn is_count_star_aggregate(aggregate_function: &AggregateFunction) -> bool {
-    matches!(aggregate_function,
-        AggregateFunction {
-            func,
-            params: AggregateFunctionParams { args, .. },
-        } if func.name() == "count" && (args.len() == 1 && 
is_wildcard(&args[0]) || args.is_empty()))
-}
-
-fn is_count_star_window_aggregate(window_function: &WindowFunction) -> bool {
-    let args = &window_function.params.args;
-    matches!(window_function.fun,
-        WindowFunctionDefinition::AggregateUDF(ref udaf)
-            if udaf.name() == "count" && (args.len() == 1 && 
is_wildcard(&args[0]) || args.is_empty()))
-}
-
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
-    let name_preserver = NamePreserver::new(&plan);
-    plan.map_expressions(|expr| {
-        let original_name = name_preserver.save(&expr);
-        let transformed_expr = expr.transform_up(|expr| match expr {
-            Expr::WindowFunction(mut window_function)
-                if is_count_star_window_aggregate(&window_function) =>
-            {
-                window_function.params.args = vec![lit(COUNT_STAR_EXPANSION)];
-                Ok(Transformed::yes(Expr::WindowFunction(window_function)))
-            }
-            Expr::AggregateFunction(mut aggregate_function)
-                if is_count_star_aggregate(&aggregate_function) =>
-            {
-                aggregate_function.params.args = 
vec![lit(COUNT_STAR_EXPANSION)];
-                Ok(Transformed::yes(Expr::AggregateFunction(
-                    aggregate_function,
-                )))
-            }
-            _ => Ok(Transformed::no(expr)),
-        })?;
-        Ok(transformed_expr.update_data(|data| original_name.restore(data)))
-    })
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::test::*;
-    use arrow::datatypes::DataType;
-    use datafusion_common::ScalarValue;
-    use datafusion_expr::expr::Sort;
-    use datafusion_expr::ExprFunctionExt;
-    use datafusion_expr::{
-        col, exists, in_subquery, logical_plan::LogicalPlanBuilder, 
out_ref_col,
-        scalar_subquery, wildcard, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
-    };
-    use datafusion_functions_aggregate::count::count_udaf;
-    use datafusion_functions_aggregate::expr_fn::max;
-    use std::sync::Arc;
-
-    use datafusion_functions_aggregate::expr_fn::{count, sum};
-
-    fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
-        assert_analyzed_plan_eq_display_indent(
-            Arc::new(CountWildcardRule::new()),
-            plan,
-            expected,
-        )
-    }
-
-    #[test]
-    fn test_count_wildcard_on_sort() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("b")], vec![count(wildcard())])?
-            .project(vec![count(wildcard())])?
-            .sort(vec![count(wildcard()).sort(true, false)])?
-            .build()?;
-        let expected = "Sort: count(*) ASC NULLS LAST [count(*):Int64]\
-        \n  Projection: count(*) [count(*):Int64]\
-        \n    Aggregate: groupBy=[[test.b]], aggr=[[count(Int64(1)) AS 
count(*)]] [b:UInt32, count(*):Int64]\
-        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-
-    #[test]
-    fn test_count_wildcard_on_where_in() -> Result<()> {
-        let table_scan_t1 = test_table_scan_with_name("t1")?;
-        let table_scan_t2 = test_table_scan_with_name("t2")?;
-
-        let plan = LogicalPlanBuilder::from(table_scan_t1)
-            .filter(in_subquery(
-                col("a"),
-                Arc::new(
-                    LogicalPlanBuilder::from(table_scan_t2)
-                        .aggregate(Vec::<Expr>::new(), 
vec![count(wildcard())])?
-                        .project(vec![count(wildcard())])?
-                        .build()?,
-                ),
-            ))?
-            .build()?;
-
-        let expected = "Filter: t1.a IN (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
-        \n  Subquery: [count(*):Int64]\
-        \n    Projection: count(*) [count(*):Int64]\
-        \n      Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] 
[count(*):Int64]\
-        \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
-        \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-
-    #[test]
-    fn test_count_wildcard_on_where_exists() -> Result<()> {
-        let table_scan_t1 = test_table_scan_with_name("t1")?;
-        let table_scan_t2 = test_table_scan_with_name("t2")?;
-
-        let plan = LogicalPlanBuilder::from(table_scan_t1)
-            .filter(exists(Arc::new(
-                LogicalPlanBuilder::from(table_scan_t2)
-                    .aggregate(Vec::<Expr>::new(), vec![count(wildcard())])?
-                    .project(vec![count(wildcard())])?
-                    .build()?,
-            )))?
-            .build()?;
-
-        let expected = "Filter: EXISTS (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
-        \n  Subquery: [count(*):Int64]\
-        \n    Projection: count(*) [count(*):Int64]\
-        \n      Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] 
[count(*):Int64]\
-        \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
-        \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-
-    #[test]
-    fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
-        let table_scan_t1 = test_table_scan_with_name("t1")?;
-        let table_scan_t2 = test_table_scan_with_name("t2")?;
-
-        let plan = LogicalPlanBuilder::from(table_scan_t1)
-            .filter(
-                scalar_subquery(Arc::new(
-                    LogicalPlanBuilder::from(table_scan_t2)
-                        .filter(out_ref_col(DataType::UInt32, 
"t1.a").eq(col("t2.a")))?
-                        .aggregate(
-                            Vec::<Expr>::new(),
-                            vec![count(lit(COUNT_STAR_EXPANSION))],
-                        )?
-                        .project(vec![count(lit(COUNT_STAR_EXPANSION))])?
-                        .build()?,
-                ))
-                .gt(lit(ScalarValue::UInt8(Some(0)))),
-            )?
-            .project(vec![col("t1.a"), col("t1.b")])?
-            .build()?;
-
-        let expected = "Projection: t1.a, t1.b [a:UInt32, b:UInt32]\
-              \n  Filter: (<subquery>) > UInt8(0) [a:UInt32, b:UInt32, 
c:UInt32]\
-              \n    Subquery: [count(Int64(1)):Int64]\
-              \n      Projection: count(Int64(1)) [count(Int64(1)):Int64]\
-              \n        Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 
[count(Int64(1)):Int64]\
-              \n          Filter: outer_ref(t1.a) = t2.a [a:UInt32, b:UInt32, 
c:UInt32]\
-              \n            TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
-              \n    TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-    #[test]
-    fn test_count_wildcard_on_window() -> Result<()> {
-        let table_scan = test_table_scan()?;
-
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .window(vec![Expr::WindowFunction(WindowFunction::new(
-                WindowFunctionDefinition::AggregateUDF(count_udaf()),
-                vec![wildcard()],
-            ))
-            .order_by(vec![Sort::new(col("a"), false, true)])
-            .window_frame(WindowFrame::new_bounds(
-                WindowFrameUnits::Range,
-                WindowFrameBound::Preceding(ScalarValue::UInt32(Some(6))),
-                WindowFrameBound::Following(ScalarValue::UInt32(Some(2))),
-            ))
-            .build()?])?
-            .project(vec![count(wildcard())])?
-            .build()?;
-
-        let expected = "Projection: count(Int64(1)) AS count(*) 
[count(*):Int64]\
-        \n  WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC 
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY 
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] 
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST] 
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-
-    #[test]
-    fn test_count_wildcard_on_aggregate() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(Vec::<Expr>::new(), vec![count(wildcard())])?
-            .project(vec![count(wildcard())])?
-            .build()?;
-
-        let expected = "Projection: count(*) [count(*):Int64]\
-        \n  Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] 
[count(*):Int64]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-
-    #[test]
-    fn test_count_wildcard_on_non_count_aggregate() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let res = LogicalPlanBuilder::from(table_scan)
-            .aggregate(Vec::<Expr>::new(), vec![sum(wildcard())]);
-        assert!(res.is_err());
-        Ok(())
-    }
-
-    #[test]
-    fn test_count_wildcard_on_nesting() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(Vec::<Expr>::new(), vec![max(count(wildcard()))])?
-            .project(vec![count(wildcard())])?
-            .build()?;
-
-        let expected = "Projection: count(Int64(1)) AS count(*) 
[count(*):Int64]\
-        \n  Aggregate: groupBy=[[]], aggr=[[max(count(Int64(1))) AS 
max(count(*))]] [max(count(*)):Int64;N]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-        assert_plan_eq(plan, expected)
-    }
-}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index 9d0ac6b54c..c506616d14 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -28,7 +28,6 @@ use datafusion_common::Result;
 use datafusion_expr::expr_rewriter::FunctionRewrite;
 use datafusion_expr::{InvariantLevel, LogicalPlan};
 
-use crate::analyzer::count_wildcard_rule::CountWildcardRule;
 use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
 use crate::analyzer::inline_table_scan::InlineTableScan;
 use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
@@ -37,7 +36,6 @@ use crate::utils::log_plan;
 
 use self::function_rewrite::ApplyFunctionRewrites;
 
-pub mod count_wildcard_rule;
 pub mod expand_wildcard_rule;
 pub mod function_rewrite;
 pub mod inline_table_scan;
@@ -106,7 +104,6 @@ impl Analyzer {
             // [Expr::Wildcard] should be expanded before [TypeCoercion]
             Arc::new(ResolveGroupingFunction::new()),
             Arc::new(TypeCoercion::new()),
-            Arc::new(CountWildcardRule::new()),
         ];
         Self::with_rules(rules)
     }
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index a33ecbc3a1..b59acd72a2 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -23,11 +23,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, 
TimeUnit};
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{assert_contains, plan_err, Result, TableReference};
+use datafusion_expr::planner::ExprPlanner;
 use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
 use datafusion_expr::test::function_stub::sum_udaf;
 use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, 
WindowUDF};
 use datafusion_functions_aggregate::average::avg_udaf;
 use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_functions_aggregate::planner::AggregateFunctionPlanner;
+use datafusion_functions_window::planner::WindowFunctionPlanner;
 use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;
 use datafusion_optimizer::analyzer::Analyzer;
 use datafusion_optimizer::optimizer::Optimizer;
@@ -195,7 +198,7 @@ fn between_date32_plus_interval() -> Result<()> {
     WHERE col_date32 between '1998-03-18' AND cast('1998-03-18' as date) + 
INTERVAL '90 days'";
     let plan = test_sql(sql)?;
     let expected =
-        "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+        "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
         \n  Projection: \
         \n    Filter: test.col_date32 >= Date32(\"1998-03-18\") AND 
test.col_date32 <= Date32(\"1998-06-16\")\
         \n      TableScan: test projection=[col_date32]";
@@ -209,7 +212,7 @@ fn between_date64_plus_interval() -> Result<()> {
     WHERE col_date64 between '1998-03-18T00:00:00' AND cast('1998-03-18' as 
date) + INTERVAL '90 days'";
     let plan = test_sql(sql)?;
     let expected =
-        "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+        "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
         \n  Projection: \
         \n    Filter: test.col_date64 >= Date64(\"1998-03-18\") AND 
test.col_date64 <= Date64(\"1998-06-16\")\
         \n      TableScan: test projection=[col_date64]";
@@ -266,7 +269,7 @@ fn push_down_filter_groupby_expr_contains_alias() {
     let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) 
FROM test GROUP BY 1) where c > 3";
     let plan = test_sql(sql).unwrap();
     let expected = "Projection: test.col_int32 + test.col_uint32 AS c, 
count(*)\
-    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS 
Int32)]], aggr=[[count(Int64(1)) AS count(*)]]\
+    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS 
Int32)]], aggr=[[count(*)]]\
     \n    Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\
     \n      TableScan: test projection=[col_int32, col_uint32]";
     assert_eq!(expected, format!("{plan}"));
@@ -311,7 +314,7 @@ fn eliminate_redundant_null_check_on_count() {
     let plan = test_sql(sql).unwrap();
     let expected = "\
         Projection: test.col_int32, count(*) AS c\
-        \n  Aggregate: groupBy=[[test.col_int32]], aggr=[[count(Int64(1)) AS 
count(*)]]\
+        \n  Aggregate: groupBy=[[test.col_int32]], aggr=[[count(*)]]\
         \n    TableScan: test projection=[col_int32]";
     assert_eq!(expected, format!("{plan}"));
 }
@@ -422,7 +425,12 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     let context_provider = MyContextProvider::default()
         .with_udaf(sum_udaf())
         .with_udaf(count_udaf())
-        .with_udaf(avg_udaf());
+        .with_udaf(avg_udaf())
+        .with_expr_planners(vec![
+            Arc::new(AggregateFunctionPlanner),
+            Arc::new(WindowFunctionPlanner),
+        ]);
+
     let sql_to_rel = SqlToRel::new(&context_provider);
     let plan = sql_to_rel.sql_statement_to_plan(statement.clone())?;
 
@@ -440,6 +448,7 @@ fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) 
{}
 struct MyContextProvider {
     options: ConfigOptions,
     udafs: HashMap<String, Arc<AggregateUDF>>,
+    expr_planners: Vec<Arc<dyn ExprPlanner>>,
 }
 
 impl MyContextProvider {
@@ -448,6 +457,11 @@ impl MyContextProvider {
         self.udafs.insert(udaf.name().to_lowercase(), udaf);
         self
     }
+
+    fn with_expr_planners(mut self, expr_planners: Vec<Arc<dyn ExprPlanner>>) 
-> Self {
+        self.expr_planners = expr_planners;
+        self
+    }
 }
 
 impl ContextProvider for MyContextProvider {
@@ -516,6 +530,10 @@ impl ContextProvider for MyContextProvider {
     fn udwf_names(&self) -> Vec<String> {
         Vec::new()
     }
+
+    fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
+        &self.expr_planners
+    }
 }
 
 struct MyTableSource {
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 1cf3dcb289..035749a789 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -23,7 +23,7 @@ use datafusion_common::{
     DFSchema, Dependency, Result,
 };
 use datafusion_expr::expr::{ScalarFunction, Unnest};
-use datafusion_expr::planner::PlannerResult;
+use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr};
 use datafusion_expr::{
     expr, qualified_wildcard, wildcard, Expr, ExprFunctionExt, ExprSchemable,
     WindowFrame, WindowFunctionDefinition,
@@ -315,15 +315,38 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             };
 
             if let Ok(fun) = self.find_window_func(&name) {
-                return Expr::WindowFunction(expr::WindowFunction::new(
-                    fun,
-                    self.function_args_to_expr(args, schema, planner_context)?,
-                ))
-                .partition_by(partition_by)
-                .order_by(order_by)
-                .window_frame(window_frame)
-                .null_treatment(null_treatment)
-                .build();
+                let args = self.function_args_to_expr(args, schema, 
planner_context)?;
+                let mut window_expr = RawWindowExpr {
+                    func_def: fun,
+                    args,
+                    partition_by,
+                    order_by,
+                    window_frame,
+                    null_treatment,
+                };
+
+                for planner in 
self.context_provider.get_expr_planners().iter() {
+                    match planner.plan_window(window_expr)? {
+                        PlannerResult::Planned(expr) => return Ok(expr),
+                        PlannerResult::Original(expr) => window_expr = expr,
+                    }
+                }
+
+                let RawWindowExpr {
+                    func_def,
+                    args,
+                    partition_by,
+                    order_by,
+                    window_frame,
+                    null_treatment,
+                } = window_expr;
+
+                return 
Expr::WindowFunction(expr::WindowFunction::new(func_def, args))
+                    .partition_by(partition_by)
+                    .order_by(order_by)
+                    .window_frame(window_frame)
+                    .null_treatment(null_treatment)
+                    .build();
             }
         } else {
             // User defined aggregate functions (UDAF) have precedence in case 
it has the same name as a scalar built-in function
@@ -341,8 +364,33 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     .map(|e| self.sql_expr_to_logical_expr(*e, schema, 
planner_context))
                     .transpose()?
                     .map(Box::new);
+
+                let mut aggregate_expr = RawAggregateExpr {
+                    func: fm,
+                    args,
+                    distinct,
+                    filter,
+                    order_by,
+                    null_treatment,
+                };
+                for planner in 
self.context_provider.get_expr_planners().iter() {
+                    match planner.plan_aggregate(aggregate_expr)? {
+                        PlannerResult::Planned(expr) => return Ok(expr),
+                        PlannerResult::Original(expr) => aggregate_expr = expr,
+                    }
+                }
+
+                let RawAggregateExpr {
+                    func,
+                    args,
+                    distinct,
+                    filter,
+                    order_by,
+                    null_treatment,
+                } = aggregate_expr;
+
                 return 
Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
-                    fm,
+                    func,
                     args,
                     distinct,
                     filter,
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index 1df1830268..9c0d6316ad 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -1461,14 +1461,14 @@ fn 
select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby()
 fn select_simple_aggregate_with_groupby_can_use_positions() {
     quick_test(
         "SELECT state, age AS b, count(1) FROM person GROUP BY 1, 2",
-        "Projection: person.state, person.age AS b, count(Int64(1))\
-             \n  Aggregate: groupBy=[[person.state, person.age]], 
aggr=[[count(Int64(1))]]\
+        "Projection: person.state, person.age AS b, count(*)\
+             \n  Aggregate: groupBy=[[person.state, person.age]], 
aggr=[[count(*)]]\
              \n    TableScan: person",
     );
     quick_test(
         "SELECT state, age AS b, count(1) FROM person GROUP BY 2, 1",
-        "Projection: person.state, person.age AS b, count(Int64(1))\
-             \n  Aggregate: groupBy=[[person.age, person.state]], 
aggr=[[count(Int64(1))]]\
+        "Projection: person.state, person.age AS b, count(*)\
+             \n  Aggregate: groupBy=[[person.age, person.state]], 
aggr=[[count(*)]]\
              \n    TableScan: person",
     );
 }
@@ -1630,8 +1630,8 @@ fn test_wildcard() {
 #[test]
 fn select_count_one() {
     let sql = "SELECT count(1) FROM person";
-    let expected = "Projection: count(Int64(1))\
-                        \n  Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+    let expected = "Projection: count(*)\
+                        \n  Aggregate: groupBy=[[]], aggr=[[count(*)]]\
                         \n    TableScan: person";
     quick_test(sql, expected);
 }
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 7caa81d64e..f175973f92 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -6276,6 +6276,26 @@ physical_plan
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], 
file_type=csv, has_header=true
 
+# test count(null) case (null with type)
+
+statement count 0
+create table t(a int, b int) as values (1, 3), (2, 4), (3, 5);
+
+query I
+select count(null::bigint) from t;
+----
+0
+
+query TT
+explain select count(null::bigint) from t;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[count(Int64(NULL)) AS count(NULL)]]
+02)--TableScan: t projection=[]
+physical_plan
+01)AggregateExec: mode=Single, gby=[], aggr=[count(NULL)]
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+
 #######
 # Group median test
 #######
diff --git a/datafusion/sqllogictest/test_files/avro.slt 
b/datafusion/sqllogictest/test_files/avro.slt
index 80bf0bc2dd..20179e0c5b 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -243,7 +243,7 @@ query TT
 EXPLAIN SELECT count(*) from alltypes_plain
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--TableScan: alltypes_plain projection=[]
 physical_plan
 01)AggregateExec: mode=Final, gby=[], aggr=[count(*)]
diff --git a/datafusion/sqllogictest/test_files/coalesce.slt 
b/datafusion/sqllogictest/test_files/coalesce.slt
index 5f2d2f0d1d..e7cf31dc69 100644
--- a/datafusion/sqllogictest/test_files/coalesce.slt
+++ b/datafusion/sqllogictest/test_files/coalesce.slt
@@ -442,4 +442,4 @@ drop table test
 query T
 select coalesce(arrow_cast('', 'Utf8View'), arrow_cast('', 'Dictionary(UInt32, 
Utf8)'));
 ----
-(empty)
\ No newline at end of file
+(empty)
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 7dd85b3ae2..f39ff56ce4 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -631,4 +631,3 @@ COPY source_table  to '/tmp/table.parquet' (row_group_size 
55 + 102);
 # Copy using execution.keep_partition_by_columns with an invalid value
 query error DataFusion error: Invalid or Unsupported Configuration: provided 
value for 'execution.keep_partition_by_columns' was not recognized: 
"invalid_value"
 COPY source_table  to '/tmp/table.parquet' OPTIONS 
(execution.keep_partition_by_columns invalid_value);
-
diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt 
b/datafusion/sqllogictest/test_files/count_star_rule.slt
index d660257b60..0efd9e9988 100644
--- a/datafusion/sqllogictest/test_files/count_star_rule.slt
+++ b/datafusion/sqllogictest/test_files/count_star_rule.slt
@@ -31,44 +31,44 @@ query TT
 EXPLAIN SELECT COUNT() FROM (SELECT 1 AS a, 2 AS b) AS t;
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count()]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--SubqueryAlias: t
 03)----EmptyRelation
 physical_plan
-01)ProjectionExec: expr=[1 as count()]
+01)ProjectionExec: expr=[1 as count(*)]
 02)--PlaceholderRowExec
 
 query TT
 EXPLAIN SELECT t1.a, COUNT() FROM t1 GROUP BY t1.a;
 ----
 logical_plan
-01)Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]]
+01)Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]]
 02)--TableScan: t1 projection=[a]
 physical_plan
-01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()]
+01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]
 02)--CoalesceBatchesExec: target_batch_size=8192
 03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()]
+05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]
 06)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query TT
 EXPLAIN SELECT t1.a, COUNT() AS cnt FROM t1 GROUP BY t1.a HAVING COUNT() > 0;
 ----
 logical_plan
-01)Projection: t1.a, count() AS cnt
-02)--Filter: count() > Int64(0)
-03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(Int64(1)) AS count()]]
+01)Projection: t1.a, count(*) AS cnt
+02)--Filter: count(*) > Int64(0)
+03)----Aggregate: groupBy=[[t1.a]], aggr=[[count(*)]]
 04)------TableScan: t1 projection=[a]
 physical_plan
-01)ProjectionExec: expr=[a@0 as a, count()@1 as cnt]
+01)ProjectionExec: expr=[a@0 as a, count(*)@1 as cnt]
 02)--CoalesceBatchesExec: target_batch_size=8192
-03)----FilterExec: count()@1 > 0
-04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count()]
+03)----FilterExec: count(*)@1 > 0
+04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count()]
+08)--------------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]
 09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II
@@ -80,12 +80,12 @@ query TT
 EXPLAIN SELECT a, COUNT() OVER (PARTITION BY a) AS count_a FROM t1;
 ----
 logical_plan
-01)Projection: t1.a, count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS count_a
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [t1.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS count() PARTITION BY [t1.a] ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+01)Projection: t1.a, count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING AS count_a
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [t1.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
 03)----TableScan: t1 projection=[a]
 physical_plan
-01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
-02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY 
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]
+01)ProjectionExec: expr=[a@0 as a, count(*) PARTITION BY [t1.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
+02)--WindowAggExec: wdw=[count(*) PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(*) PARTITION BY 
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]
 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
 04)------DataSourceExec: partitions=1, partition_sizes=[1]
 
diff --git a/datafusion/sqllogictest/test_files/ddl.slt 
b/datafusion/sqllogictest/test_files/ddl.slt
index aefc2672b5..6f75a7d7f8 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -827,4 +827,3 @@ drop table table_with_pk;
 
 statement ok
 set datafusion.catalog.information_schema = false;
-
diff --git a/datafusion/sqllogictest/test_files/errors.slt 
b/datafusion/sqllogictest/test_files/errors.slt
index a35a4d6f28..dc7a53adf8 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -184,4 +184,4 @@ query error DataFusion error: Schema error: No field named 
ammp\. Did you mean '
 select ammp from a;
 
 statement ok
-drop table a;
\ No newline at end of file
+drop table a;
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 037565ce05..0d5eab6cf5 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -178,7 +178,6 @@ logical_plan after inline_table_scan SAME TEXT AS ABOVE
 logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE
 logical_plan after resolve_grouping_function SAME TEXT AS ABOVE
 logical_plan after type_coercion SAME TEXT AS ABOVE
-logical_plan after count_wildcard_rule SAME TEXT AS ABOVE
 analyzed_logical_plan SAME TEXT AS ABOVE
 logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
 logical_plan after simplify_expressions SAME TEXT AS ABOVE
@@ -427,7 +426,7 @@ logical_plan
 02)--TableScan: t1 projection=[a]
 03)--SubqueryAlias: __correlated_sq_1
 04)----Projection: 
-05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+05)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 06)--------TableScan: t2 projection=[]
 physical_plan
 01)NestedLoopJoinExec: join_type=LeftSemi
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index ee76ee1c55..32428fdef7 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -61,7 +61,7 @@ logical_plan
 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 01)DataSinkExec: sink=MemoryTable (partitions=1)
@@ -122,7 +122,7 @@ FROM aggregate_test_100
 logical_plan
 01)Dml: op=[Insert Into] table=[table_without_values]
 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 04)------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 01)DataSinkExec: sink=MemoryTable (partitions=1)
@@ -172,7 +172,7 @@ logical_plan
 02)--Projection: a1 AS a1, a2 AS a2
 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 01)DataSinkExec: sink=MemoryTable (partitions=8)
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index ee1d67c5e2..752e8ce0e4 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -352,7 +352,7 @@ logical_plan
 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
 04)------Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
-05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 06)----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 01)DataSinkExec: sink=ParquetSink(file_groups=[])
@@ -414,7 +414,7 @@ FROM aggregate_test_100
 logical_plan
 01)Dml: op=[Insert Into] table=[table_without_values]
 02)--Projection: sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
-03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+03)----WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 04)------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 01)DataSinkExec: sink=ParquetSink(file_groups=[])
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 5d311bc432..5b5368f6b0 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1396,7 +1396,7 @@ group by t1_id
 ----
 logical_plan
 01)Projection: count(*)
-02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(*)]]
 03)----Projection: join_t1.t1_id
 04)------Inner Join: join_t1.t1_id = join_t2.t2_id
 05)--------TableScan: join_t1 projection=[t1_id]
@@ -4442,7 +4442,7 @@ FROM my_catalog.my_schema.table_with_many_types AS l
 JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = 
r.binary_col
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--Projection: 
 03)----Inner Join: l.binary_col = r.binary_col
 04)------SubqueryAlias: l
diff --git a/datafusion/sqllogictest/test_files/json.slt 
b/datafusion/sqllogictest/test_files/json.slt
index dd310f7f2b..466bba5566 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -54,7 +54,7 @@ query TT
 EXPLAIN SELECT count(*) from json_test
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--TableScan: json_test projection=[]
 physical_plan
 01)AggregateExec: mode=Final, gby=[], aggr=[count(*)]
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 4e74b27b87..b4487be850 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -307,7 +307,7 @@ query TT
 EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 11);
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--Limit: skip=11, fetch=3
 03)----TableScan: t1 projection=[], fetch=14
 physical_plan
@@ -325,7 +325,7 @@ query TT
 EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 LIMIT 3 OFFSET 8);
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--Limit: skip=8, fetch=3
 03)----TableScan: t1 projection=[], fetch=11
 physical_plan
@@ -343,7 +343,7 @@ query TT
 EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 OFFSET 8);
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--Limit: skip=8, fetch=None
 03)----TableScan: t1 projection=[]
 physical_plan
@@ -360,7 +360,7 @@ query TT
 EXPLAIN SELECT COUNT(*) FROM (SELECT a FROM t1 WHERE a > 3 LIMIT 3 OFFSET 6);
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+01)Aggregate: groupBy=[[]], aggr=[[count(*)]]
 02)--Projection: 
 03)----Limit: skip=6, fetch=3
 04)------Filter: t1.a > Int32(3)
diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt 
b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
index de6a153f58..8c87af75ed 100644
--- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
+++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt
@@ -48,8 +48,8 @@ FROM test_table t
 GROUP BY 1, 2, 3, 4
 ----
 logical_plan
-01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(Int64(1))
-02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(Int64(1))]]
+01)Projection: t.c1, Int64(99999), t.c5 + t.c8, Utf8("test"), count(*)
+02)--Aggregate: groupBy=[[t.c1, t.c5 + t.c8]], aggr=[[count(*)]]
 03)----SubqueryAlias: t
 04)------TableScan: test_table projection=[c1, c5, c8]
 
@@ -60,8 +60,8 @@ FROM test_table t
 group by 1, 2, 3
 ----
 logical_plan
-01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), avg(t.c12)
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), avg(t.c12)]]
+01)Projection: Int64(123), Int64(456), Int64(789), count(*), avg(t.c12)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*), avg(t.c12)]]
 03)----SubqueryAlias: t
 04)------TableScan: test_table projection=[c12]
 
@@ -72,8 +72,8 @@ FROM test_table t
 GROUP BY 1, 2
 ----
 logical_plan
-01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter, 
count(Int64(1))
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter, 
count(*)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
 03)----SubqueryAlias: t
 04)------TableScan: test_table projection=[]
 
@@ -90,8 +90,8 @@ FROM test_table t
 GROUP BY 1
 ----
 logical_plan
-01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN 
Int64(50) AND Int64(60), count(Int64(1))
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN 
Int64(50) AND Int64(60), count(*)
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
 03)----SubqueryAlias: t
 04)------TableScan: test_table projection=[]
 
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index e12bdca37e..dcd373546d 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1541,7 +1541,7 @@ LIMIT 4)
 GROUP BY c2;
 ----
 logical_plan
-01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(Int64(1)) AS 
count(*)]]
+01)Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[count(*)]]
 02)--Projection: aggregate_test_100.c2
 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC 
NULLS LAST, fetch=4
 04)------Projection: aggregate_test_100.c2, aggregate_test_100.c1
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 264392fc10..c847f433f7 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -555,7 +555,7 @@ logical_plan
 03)----Subquery:
 04)------Projection: count(*)
 05)--------Filter: sum(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
-06)----------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*), 
sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
+06)----------Aggregate: groupBy=[[]], aggr=[[count(*), 
sum(CAST(outer_ref(t1.t1_int) + t2.t2_id AS Int64))]]
 07)------------Filter: outer_ref(t1.t1_name) = t2.t2_name
 08)--------------TableScan: t2
 09)----TableScan: t1 projection=[t1_id, t1_name, t1_int]
@@ -738,7 +738,7 @@ explain select (select count(*) from t1) as b
 logical_plan
 01)Projection: __scalar_sq_1.count(*) AS b
 02)--SubqueryAlias: __scalar_sq_1
-03)----Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+03)----Aggregate: groupBy=[[]], aggr=[[count(*)]]
 04)------TableScan: t1 projection=[]
 
 #simple_uncorrelated_scalar_subquery2
@@ -746,13 +746,13 @@ query TT
 explain select (select count(*) from t1) as b, (select count(1) from t2)
 ----
 logical_plan
-01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS 
count(Int64(1))
+01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*)
 02)--Left Join: 
 03)----SubqueryAlias: __scalar_sq_1
-04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+04)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 05)--------TableScan: t1 projection=[]
 06)----SubqueryAlias: __scalar_sq_2
-07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+07)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 08)--------TableScan: t2 projection=[]
 
 statement ok
@@ -762,20 +762,20 @@ query TT
 explain select (select count(*) from t1) as b, (select count(1) from t2)
 ----
 logical_plan
-01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(Int64(1)) AS 
count(Int64(1))
+01)Projection: __scalar_sq_1.count(*) AS b, __scalar_sq_2.count(*) AS count(*)
 02)--Left Join: 
 03)----SubqueryAlias: __scalar_sq_1
-04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+04)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 05)--------TableScan: t1 projection=[]
 06)----SubqueryAlias: __scalar_sq_2
-07)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
+07)------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 08)--------TableScan: t2 projection=[]
 physical_plan
-01)ProjectionExec: expr=[count(*)@0 as b, count(Int64(1))@1 as count(Int64(1))]
+01)ProjectionExec: expr=[count(*)@0 as b, count(*)@1 as count(*)]
 02)--NestedLoopJoinExec: join_type=Left
 03)----ProjectionExec: expr=[4 as count(*)]
 04)------PlaceholderRowExec
-05)----ProjectionExec: expr=[4 as count(Int64(1))]
+05)----ProjectionExec: expr=[4 as count(*)]
 06)------PlaceholderRowExec
 
 statement ok
@@ -796,7 +796,7 @@ logical_plan
 03)----TableScan: t1 projection=[t1_id, t1_int]
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 07)----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -818,7 +818,7 @@ logical_plan
 03)----TableScan: t1 projection=[t1_id, t1_int]
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 07)----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -839,7 +839,7 @@ logical_plan
 03)----TableScan: t1 projection=[t1_id, t1_int]
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*) AS _cnt, t2.t2_int, Boolean(true) AS 
__always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 07)----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -860,7 +860,7 @@ logical_plan
 03)----TableScan: t1 projection=[t1_id, t1_int]
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*) + Int64(2) AS _cnt, t2.t2_int, Boolean(true) AS 
__always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 07)----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -883,7 +883,7 @@ logical_plan
 05)--------TableScan: t1 projection=[t1_id, t1_int]
 06)--------SubqueryAlias: __scalar_sq_1
 07)----------Projection: count(*), t2.t2_id, Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(Int64(1)) AS 
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_id]], aggr=[[count(*)]]
 09)--------------TableScan: t2 projection=[t2_id]
 
 query I rowsort
@@ -905,7 +905,7 @@ logical_plan
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int
 06)--------Filter: count(*) > Int64(1)
-07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+07)----------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 08)------------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -927,7 +927,7 @@ logical_plan
 03)----TableScan: t1 projection=[t1_id, t1_int]
 04)----SubqueryAlias: __scalar_sq_1
 05)------Projection: count(*) + Int64(2) AS cnt_plus_2, t2.t2_int, count(*), 
Boolean(true) AS __always_true
-06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+06)--------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 07)----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -951,7 +951,7 @@ logical_plan
 06)----------TableScan: t1 projection=[t1_int]
 07)--------SubqueryAlias: __scalar_sq_1
 08)----------Projection: count(*), t2.t2_int, Boolean(true) AS __always_true
-09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+09)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 10)--------------TableScan: t2 projection=[t2_int]
 
 query I rowsort
@@ -972,7 +972,7 @@ logical_plan
 05)--------TableScan: t1 projection=[t1_int]
 06)--------SubqueryAlias: __scalar_sq_1
 07)----------Projection: count(*) AS cnt, t2.t2_int, Boolean(true) AS 
__always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 09)--------------TableScan: t2 projection=[t2_int]
 
 
@@ -1002,7 +1002,7 @@ logical_plan
 05)--------TableScan: t1 projection=[t1_int]
 06)--------SubqueryAlias: __scalar_sq_1
 07)----------Projection: count(*) + Int64(1) + Int64(1) AS cnt_plus_two, 
t2.t2_int, count(*), Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 09)--------------TableScan: t2 projection=[t2_int]
 
 query I rowsort
@@ -1031,7 +1031,7 @@ logical_plan
 05)--------TableScan: t1 projection=[t1_int]
 06)--------SubqueryAlias: __scalar_sq_1
 07)----------Projection: CASE WHEN count(*) = Int64(1) THEN Int64(NULL) ELSE 
count(*) END AS cnt, t2.t2_int, Boolean(true) AS __always_true
-08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(Int64(1)) AS 
count(*)]]
+08)------------Aggregate: groupBy=[[t2.t2_int]], aggr=[[count(*)]]
 09)--------------TableScan: t2 projection=[t2_int]
 
 
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
index 2616b7b75b..6a41ecb51b 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part
@@ -42,7 +42,7 @@ explain select
 logical_plan
 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS 
LAST
 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS 
sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
avg(lineitem.l_quantity) AS avg_qty, avg(lineitem.l_extendedprice) AS 
avg_price, avg(lineitem.l_discount) AS avg_disc, count(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), 
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + 
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), 
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1)) AS  
[...]
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), 
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + 
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), 
avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)]]
 04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity, 
lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, 
lineitem.l_returnflag, lineitem.l_linestatus
 05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02")
 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], 
partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
index eb41445c3c..68532733c6 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q13.slt.part
@@ -42,7 +42,7 @@ limit 10;
 logical_plan
 01)Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST, fetch=10
 02)--Projection: c_orders.c_count, count(*) AS custdist
-03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(Int64(1)) AS 
count(*)]]
+03)----Aggregate: groupBy=[[c_orders.c_count]], aggr=[[count(*)]]
 04)------SubqueryAlias: c_orders
 05)--------Projection: count(orders.o_orderkey) AS c_count
 06)----------Aggregate: groupBy=[[customer.c_custkey]], 
aggr=[[count(orders.o_orderkey)]]
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
index 9e39732689..eb10f4c8d1 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q21.slt.part
@@ -60,7 +60,7 @@ order by
 logical_plan
 01)Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
 02)--Projection: supplier.s_name, count(*) AS numwait
-03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(Int64(1)) AS 
count(*)]]
+03)----Aggregate: groupBy=[[supplier.s_name]], aggr=[[count(*)]]
 04)------Projection: supplier.s_name
 05)--------LeftAnti Join: l1.l_orderkey = __correlated_sq_2.l_orderkey Filter: 
__correlated_sq_2.l_suppkey != l1.l_suppkey
 06)----------LeftSemi Join: l1.l_orderkey = __correlated_sq_1.l_orderkey 
Filter: __correlated_sq_1.l_suppkey != l1.l_suppkey
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
index 9ad9936125..af8b7948c1 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
@@ -58,7 +58,7 @@ order by
 logical_plan
 01)Sort: custsale.cntrycode ASC NULLS LAST
 02)--Projection: custsale.cntrycode, count(*) AS numcust, 
sum(custsale.c_acctbal) AS totacctbal
-03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(Int64(1)) AS 
count(*), sum(custsale.c_acctbal)]]
+03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(*), 
sum(custsale.c_acctbal)]]
 04)------SubqueryAlias: custsale
 05)--------Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS 
cntrycode, customer.c_acctbal
 06)----------Inner Join:  Filter: CAST(customer.c_acctbal AS Decimal128(19, 
6)) > __scalar_sq_2.avg(customer.c_acctbal)
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
index fb93850ab0..766b21c22f 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q4.slt.part
@@ -42,7 +42,7 @@ order by
 logical_plan
 01)Sort: orders.o_orderpriority ASC NULLS LAST
 02)--Projection: orders.o_orderpriority, count(*) AS order_count
-03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(Int64(1)) 
AS count(*)]]
+03)----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[count(*)]]
 04)------Projection: orders.o_orderpriority
 05)--------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
 06)----------Projection: orders.o_orderkey, orders.o_orderpriority
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index dfac9c0310..57207f00f7 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -449,7 +449,7 @@ SELECT count(*) FROM (
 ----
 logical_plan
 01)Projection: count(*)
-02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(*)]]
 03)----Union
 04)------Aggregate: groupBy=[[t1.name]], aggr=[[]]
 05)--------TableScan: t1 projection=[name]
@@ -493,7 +493,7 @@ logical_plan
 02)--Union
 03)----Projection: count(*) AS cnt
 04)------Limit: skip=0, fetch=3
-05)--------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+05)--------Aggregate: groupBy=[[]], aggr=[[count(*)]]
 06)----------SubqueryAlias: a
 07)------------Projection: 
 08)--------------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
@@ -651,7 +651,7 @@ select x, y from (select 1 as x , max(10) as y) b
 logical_plan
 01)Union
 02)--Projection: count(*) AS count, a.n
-03)----Aggregate: groupBy=[[a.n]], aggr=[[count(Int64(1)) AS count(*)]]
+03)----Aggregate: groupBy=[[a.n]], aggr=[[count(*)]]
 04)------SubqueryAlias: a
 05)--------Projection: Int64(5) AS n
 06)----------EmptyRelation
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index ca4713e7d5..6c00af879e 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1305,7 +1305,7 @@ EXPLAIN SELECT
 ----
 logical_plan
 01)Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING]]
 03)----Projection: aggregate_test_100.c1, aggregate_test_100.c2, 
sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
 04)------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING]]
 05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4]
@@ -1765,7 +1765,7 @@ EXPLAIN SELECT count(*) as global_count FROM
 ----
 logical_plan
 01)Projection: count(*) AS global_count
-02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Aggregate: groupBy=[[]], aggr=[[count(*)]]
 03)----SubqueryAlias: a
 04)------Projection: 
 05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
@@ -2571,10 +2571,10 @@ logical_plan
 01)Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, 
cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, 
cntr2, sum4, cnt3
 02)--Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 03)----Projection: sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, min(annotated_data_finite.inc_col) ORDER BY [annotated_data_ 
[...]
-04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, 
count(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS count(*) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, 
count(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
 05)--------Projection: __common_expr_1, annotated_data_finite.inc_col, 
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, 
min(annotated_data_finite.i [...]
-06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(anno [...]
-07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, m [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, min(anno [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, m [...]
 08)--------------Projection: CAST(annotated_data_finite.desc_col AS Int64) AS 
__common_expr_1, CAST(annotated_data_finite.inc_col AS Int64) AS 
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
 09)----------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
@@ -4112,7 +4112,7 @@ EXPLAIN select count(*) over (partition by a order by a) 
from (select * from a w
 ----
 logical_plan
 01)Projection: count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [a.a] ORDER BY [a.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS count(*) 
PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+02)--WindowAggr: windowExpr=[[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
 03)----Filter: a.a = Int64(1)
 04)------TableScan: a projection=[a]
 physical_plan
diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs 
b/datafusion/substrait/tests/cases/consumer_integration.rs
index 219f656bb4..086c085811 100644
--- a/datafusion/substrait/tests/cases/consumer_integration.rs
+++ b/datafusion/substrait/tests/cases/consumer_integration.rs
@@ -50,9 +50,9 @@ mod tests {
         let plan_str = tpch_plan_to_string(1).await?;
         assert_eq!(
             plan_str,
-            "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, 
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS 
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) 
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, 
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS 
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
+            "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, 
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS 
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) 
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, 
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS 
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(*) AS COUNT_ORDER\
             \n  Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST, 
LINEITEM.L_LINESTATUS ASC NULLS LAST\
-            \n    Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG, 
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY), 
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY), 
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\
+            \n    Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG, 
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY), 
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY), 
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(*)]]\
             \n      Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, 
LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE * 
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT), 
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) - 
LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX), 
LINEITEM.L_DISCOUNT\
             \n        Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") - 
IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\
             \n          TableScan: LINEITEM"
@@ -119,9 +119,9 @@ mod tests {
         let plan_str = tpch_plan_to_string(4).await?;
         assert_eq!(
             plan_str,
-            "Projection: ORDERS.O_ORDERPRIORITY, count(Int64(1)) AS 
ORDER_COUNT\
+            "Projection: ORDERS.O_ORDERPRIORITY, count(*) AS ORDER_COUNT\
             \n  Sort: ORDERS.O_ORDERPRIORITY ASC NULLS LAST\
-            \n    Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]], 
aggr=[[count(Int64(1))]]\
+            \n    Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]], 
aggr=[[count(*)]]\
             \n      Projection: ORDERS.O_ORDERPRIORITY\
             \n        Filter: ORDERS.O_ORDERDATE >= CAST(Utf8(\"1993-07-01\") 
AS Date32) AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1993-10-01\") AS Date32) AND 
EXISTS (<subquery>)\
             \n          Subquery:\
@@ -269,10 +269,10 @@ mod tests {
         let plan_str = tpch_plan_to_string(13).await?;
         assert_eq!(
             plan_str,
-            "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(Int64(1)) 
AS CUSTDIST\
-            \n  Sort: count(Int64(1)) DESC NULLS FIRST, 
count(ORDERS.O_ORDERKEY) DESC NULLS FIRST\
-            \n    Projection: count(ORDERS.O_ORDERKEY), count(Int64(1))\
-            \n      Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]], 
aggr=[[count(Int64(1))]]\
+            "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(*) AS 
CUSTDIST\
+            \n  Sort: count(*) DESC NULLS FIRST, count(ORDERS.O_ORDERKEY) DESC 
NULLS FIRST\
+            \n    Projection: count(ORDERS.O_ORDERKEY), count(*)\
+            \n      Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]], 
aggr=[[count(*)]]\
             \n        Projection: count(ORDERS.O_ORDERKEY)\
             \n          Aggregate: groupBy=[[CUSTOMER.C_CUSTKEY]], 
aggr=[[count(ORDERS.O_ORDERKEY)]]\
             \n            Projection: CUSTOMER.C_CUSTKEY, ORDERS.O_ORDERKEY\
@@ -410,10 +410,10 @@ mod tests {
         let plan_str = tpch_plan_to_string(21).await?;
         assert_eq!(
             plan_str,
-            "Projection: SUPPLIER.S_NAME, count(Int64(1)) AS NUMWAIT\
+            "Projection: SUPPLIER.S_NAME, count(*) AS NUMWAIT\
             \n  Limit: skip=0, fetch=100\
-            \n    Sort: count(Int64(1)) DESC NULLS FIRST, SUPPLIER.S_NAME ASC 
NULLS LAST\
-            \n      Aggregate: groupBy=[[SUPPLIER.S_NAME]], 
aggr=[[count(Int64(1))]]\
+            \n    Sort: count(*) DESC NULLS FIRST, SUPPLIER.S_NAME ASC NULLS 
LAST\
+            \n      Aggregate: groupBy=[[SUPPLIER.S_NAME]], aggr=[[count(*)]]\
             \n        Projection: SUPPLIER.S_NAME\
             \n          Filter: SUPPLIER.S_SUPPKEY = LINEITEM.L_SUPPKEY AND 
ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY AND ORDERS.O_ORDERSTATUS = Utf8(\"F\") 
AND LINEITEM.L_RECEIPTDATE > LINEITEM.L_COMMITDATE AND EXISTS (<subquery>) AND 
NOT EXISTS (<subquery>) AND SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND 
NATION.N_NAME = Utf8(\"SAUDI ARABIA\")\
             \n            Subquery:\
@@ -438,9 +438,9 @@ mod tests {
         let plan_str = tpch_plan_to_string(22).await?;
         assert_eq!(
             plan_str,
-            "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS 
CNTRYCODE, count(Int64(1)) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\
+            "Projection: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) AS 
CNTRYCODE, count(*) AS NUMCUST, sum(CUSTOMER.C_ACCTBAL) AS TOTACCTBAL\
             \n  Sort: substr(CUSTOMER.C_PHONE,Int32(1),Int32(2)) ASC NULLS 
LAST\
-            \n    Aggregate: 
groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(Int64(1)), 
sum(CUSTOMER.C_ACCTBAL)]]\
+            \n    Aggregate: 
groupBy=[[substr(CUSTOMER.C_PHONE,Int32(1),Int32(2))]], aggr=[[count(*), 
sum(CUSTOMER.C_ACCTBAL)]]\
             \n      Projection: substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)), 
CUSTOMER.C_ACCTBAL\
             \n        Filter: (substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"13\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"31\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"23\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"29\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"30\") AS Utf8) OR substr(CUSTOMER.C_PHONE, Int32(1), Int32(2)) = 
CAST(Utf8(\"18\") AS Utf8) OR [...]
             \n          Subquery:\
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index 5fb357dfcd..68856117a3 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -687,7 +687,7 @@ async fn simple_intersect() -> Result<()> {
     // Substrait treats both count(*) and count(1) the same
     assert_expected_plan(
         "SELECT count(*) FROM (SELECT data.a FROM data INTERSECT SELECT 
data2.a FROM data2);",
-        "Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]\
+        "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
          \n  Projection: \
          \n    LeftSemi Join: data.a = data2.a\
          \n      Aggregate: groupBy=[[data.a]], aggr=[[]]\
@@ -822,7 +822,7 @@ async fn simple_intersect_table_reuse() -> Result<()> {
     // Schema check works because we set aliases to what the Substrait 
consumer will generate.
     assert_expected_plan(
         "SELECT count(1) FROM (SELECT left.a FROM data AS left INTERSECT 
SELECT right.a FROM data AS right);",
-        "Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
+        "Aggregate: groupBy=[[]], aggr=[[count(*)]]\
         \n  Projection: \
         \n    LeftSemi Join: left.a = right.a\
         \n      SubqueryAlias: left\


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to