This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 01d7dbaf39 fix: correct count(*) alias (#7081)
01d7dbaf39 is described below

commit 01d7dbaf392b4a217e1a27c96bbcd1126b763828
Author: jakevin <[email protected]>
AuthorDate: Sun Jul 30 19:17:43 2023 +0800

    fix: correct count(*) alias (#7081)
    
    * remove count_wildcard_rule.rs column
    
    * fix test
    
    * fix ut
    
    * fix avro
    
    * fix tpch
    
    * fix subquery
---
 datafusion/core/src/execution/context.rs           |  10 +-
 datafusion/core/tests/path_partition.rs            |  12 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   6 +-
 datafusion/core/tests/sql/references.rs            |  10 +-
 datafusion/core/tests/sql/select.rs                |  14 +--
 .../core/tests/sqllogictests/test_files/avro.slt   |   6 +-
 .../core/tests/sqllogictests/test_files/insert.slt |  30 ++---
 .../core/tests/sqllogictests/test_files/joins.slt  |   8 +-
 .../core/tests/sqllogictests/test_files/json.slt   |   6 +-
 .../tests/sqllogictests/test_files/subquery.slt    |  78 ++++++------
 .../sqllogictests/test_files/tpch/q1.slt.part      |  10 +-
 .../sqllogictests/test_files/tpch/q13.slt.part     |  10 +-
 .../sqllogictests/test_files/tpch/q21.slt.part     |  10 +-
 .../sqllogictests/test_files/tpch/q22.slt.part     |  10 +-
 .../sqllogictests/test_files/tpch/q4.slt.part      |  10 +-
 .../core/tests/sqllogictests/test_files/union.slt  |  18 +--
 .../core/tests/sqllogictests/test_files/window.slt |  66 +++++-----
 .../optimizer/src/analyzer/count_wildcard_rule.rs  | 139 ++++++---------------
 datafusion/optimizer/src/decorrelate.rs            |   6 +-
 datafusion/optimizer/src/eliminate_project.rs      |   4 +-
 .../optimizer/tests/optimizer_integration.rs       |   4 +-
 21 files changed, 202 insertions(+), 265 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 9a518940e2..622da6f5a7 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2304,11 +2304,11 @@ mod tests {
 
         assert_eq!(results.len(), 1);
         let expected = vec![
-            "+--------------+--------------+-----------------+",
-            "| SUM(test.c1) | SUM(test.c2) | COUNT(UInt8(1)) |",
-            "+--------------+--------------+-----------------+",
-            "| 10           | 110          | 20              |",
-            "+--------------+--------------+-----------------+",
+            "+--------------+--------------+----------+",
+            "| SUM(test.c1) | SUM(test.c2) | COUNT(*) |",
+            "+--------------+--------------+----------+",
+            "| 10           | 110          | 20       |",
+            "+--------------+--------------+----------+",
         ];
         assert_batches_eq!(expected, &results);
 
diff --git a/datafusion/core/tests/path_partition.rs 
b/datafusion/core/tests/path_partition.rs
index 894ceb1b98..d4aa6c7e82 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -327,12 +327,12 @@ async fn csv_grouping_by_partition() -> Result<()> {
         .await?;
 
     let expected = vec![
-        "+------------+-----------------+----------------------+",
-        "| date       | COUNT(UInt8(1)) | COUNT(DISTINCT t.c1) |",
-        "+------------+-----------------+----------------------+",
-        "| 2021-10-26 | 100             | 5                    |",
-        "| 2021-10-27 | 100             | 5                    |",
-        "+------------+-----------------+----------------------+",
+        "+------------+----------+----------------------+",
+        "| date       | COUNT(*) | COUNT(DISTINCT t.c1) |",
+        "+------------+----------+----------------------+",
+        "| 2021-10-26 | 100      | 5                    |",
+        "| 2021-10-27 | 100      | 5                    |",
+        "+------------+----------+----------------------+",
     ];
     assert_batches_sorted_eq!(expected, &result);
 
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 9125dc0ba7..12dff15dc0 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -78,7 +78,7 @@ async fn explain_analyze_baseline_metrics() {
     );
     assert_metrics!(
         &formatted,
-        "ProjectionExec: expr=[COUNT(UInt8(1))",
+        "ProjectionExec: expr=[COUNT(*)",
         "metrics=[output_rows=1, elapsed_compute="
     );
     assert_metrics!(
@@ -695,7 +695,7 @@ async fn csv_explain_analyze() {
     // Only test basic plumbing and try to avoid having to change too
     // many things. explain_analyze_baseline_metrics covers the values
     // in greater depth
-    let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))], metrics=[output_rows=5";
+    let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(*)], metrics=[output_rows=5";
     assert_contains!(&formatted, needle);
 
     let verbose_needle = "Output Rows";
@@ -782,7 +782,7 @@ async fn explain_logical_plan_only() {
     let expected = vec![
         vec![
             "logical_plan",
-            "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+            "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
             \n  SubqueryAlias: t\
             \n    Projection: column1\
             \n      Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), 
Int64(2), Int64(150))"
diff --git a/datafusion/core/tests/sql/references.rs 
b/datafusion/core/tests/sql/references.rs
index 60191e5213..495d071600 100644
--- a/datafusion/core/tests/sql/references.rs
+++ b/datafusion/core/tests/sql/references.rs
@@ -30,11 +30,11 @@ async fn qualified_table_references() -> Result<()> {
         let sql = format!("SELECT COUNT(*) FROM {table_ref}");
         let actual = execute_to_batches(&ctx, &sql).await;
         let expected = vec![
-            "+-----------------+",
-            "| COUNT(UInt8(1)) |",
-            "+-----------------+",
-            "| 100             |",
-            "+-----------------+",
+            "+----------+",
+            "| COUNT(*) |",
+            "+----------+",
+            "| 100      |",
+            "+----------+",
         ];
         assert_batches_eq!(expected, &actual);
     }
diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index 52d338d9e9..72cd10f195 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -372,13 +372,13 @@ async fn query_on_string_dictionary() -> Result<()> {
     let sql = "SELECT d1, COUNT(*) FROM test group by d1";
     let actual = execute_to_batches(&ctx, sql).await;
     let expected = vec![
-        "+-------+-----------------+",
-        "| d1    | COUNT(UInt8(1)) |",
-        "+-------+-----------------+",
-        "| one   | 1               |",
-        "|       | 1               |",
-        "| three | 1               |",
-        "+-------+-----------------+",
+        "+-------+----------+",
+        "| d1    | COUNT(*) |",
+        "+-------+----------+",
+        "|       | 1        |",
+        "| one   | 1        |",
+        "| three | 1        |",
+        "+-------+----------+",
     ];
     assert_batches_sorted_eq!(expected, &actual);
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/avro.slt 
b/datafusion/core/tests/sqllogictests/test_files/avro.slt
index 5a01ae72cb..83b8a22ab7 100644
--- a/datafusion/core/tests/sqllogictests/test_files/avro.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/avro.slt
@@ -87,11 +87,11 @@ query TT
 EXPLAIN SELECT count(*) from alltypes_plain
 ----
 logical_plan
-Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --TableScan: alltypes_plain projection=[id]
 physical_plan
-AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
 --CoalescePartitionsExec
-----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------AvroExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]
diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt 
b/datafusion/core/tests/sqllogictests/test_files/insert.slt
index 9f4122ac5b..90a33bd1c5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/insert.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt
@@ -58,17 +58,17 @@ ORDER by c1
 ----
 logical_plan
 Dml: op=[Insert] table=[table_without_values]
---Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field2
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
-------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
---ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
+--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
 ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
-------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT [...]
---------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bou [...]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PAR [...]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bou [...]
 ----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ------------CoalesceBatchesExec: target_batch_size=8192
 --------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
@@ -121,14 +121,14 @@ FROM aggregate_test_100
 ----
 logical_plan
 Dml: op=[Insert] table=[table_without_values]
---Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field2
-----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING 
AS field2
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
 --CoalescePartitionsExec
-----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
+----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
 --------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
@@ -171,16 +171,16 @@ logical_plan
 Dml: op=[Insert] table=[table_without_values]
 --Projection: a1 AS a1, a2 AS a2
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
-------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
---------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
 InsertExec: sink=MemoryTable (partitions=8)
 --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
 ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
-------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
- --------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
- ----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bou [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ------------CoalesceBatchesExec: target_batch_size=8192
 --------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
 ----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt 
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index fdb76064ab..9baf33bdf8 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -1329,15 +1329,15 @@ from (select * from join_t1 inner join join_t2 on 
join_t1.t1_id = join_t2.t2_id)
 group by t1_id
 ----
 logical_plan
-Projection: COUNT(UInt8(1))
---Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[COUNT(UInt8(1))]]
+Projection: COUNT(*)
+--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----Projection: join_t1.t1_id
 ------Inner Join: join_t1.t1_id = join_t2.t2_id
 --------TableScan: join_t1 projection=[t1_id]
 --------TableScan: join_t2 projection=[t2_id]
 physical_plan
-ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], 
aggr=[COUNT(UInt8(1))]
+ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
+--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], 
aggr=[COUNT(*)]
 ----ProjectionExec: expr=[t1_id@0 as t1_id]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/json.slt 
b/datafusion/core/tests/sqllogictests/test_files/json.slt
index 7092127a79..edbe6912e6 100644
--- a/datafusion/core/tests/sqllogictests/test_files/json.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/json.slt
@@ -49,12 +49,12 @@ query TT
 EXPLAIN SELECT count(*) from json_test
 ----
 logical_plan
-Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --TableScan: json_test projection=[a]
 physical_plan
-AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
 --CoalescePartitionsExec
-----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------JsonExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt 
b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
index 3ab8d83c69..2857f4381d 100644
--- a/datafusion/core/tests/sqllogictests/test_files/subquery.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -495,9 +495,9 @@ explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT 
count(*) FROM t2 WHER
 logical_plan
 Filter: EXISTS (<subquery>)
 --Subquery:
-----Projection: COUNT(UInt8(1))
+----Projection: COUNT(*)
 ------Filter: SUM(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
---------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), 
SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+--------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*), 
SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
 ----------Filter: outer_ref(t1.t1_name) = t2.t2_name
 ------------TableScan: t2
 --TableScan: t1 projection=[t1_id, t1_name]
@@ -685,9 +685,9 @@ query TT
 explain select (select count(*) from t1) as b
 ----
 logical_plan
-Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b
+Projection: __scalar_sq_1.COUNT(*) AS b
 --SubqueryAlias: __scalar_sq_1
-----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ------TableScan: t1 projection=[t1_id]
 
 #simple_uncorrelated_scalar_subquery2
@@ -695,10 +695,10 @@ query TT
 explain select (select count(*) from t1) as b, (select count(1) from t2)
 ----
 logical_plan
-Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b, __scalar_sq_2.COUNT(Int64(1)) 
AS COUNT(Int64(1))
+Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS 
COUNT(Int64(1))
 --Left Join: 
 ----SubqueryAlias: __scalar_sq_1
-------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------TableScan: t1 projection=[t1_id]
 ----SubqueryAlias: __scalar_sq_2
 ------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
@@ -714,12 +714,12 @@ query TT
 explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) 
from t1
 ----
 logical_plan
-Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS COUNT(UInt8(1))
+Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(0) ELSE __scalar_sq_1.COUNT(*) END AS COUNT(*)
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
---------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -736,12 +736,12 @@ query TT
 explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as 
cnt from t1
 ----
 logical_plan
-Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS cnt
+Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(0) ELSE __scalar_sq_1.COUNT(*) END AS cnt
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
---------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -761,8 +761,8 @@ Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true 
IS NULL THEN Int64(0
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)) AS _cnt, t2.t2_int, __always_true
---------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -782,8 +782,8 @@ Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true 
IS NULL THEN Int64(2
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)) + Int64(2) AS _cnt, t2.t2_int, __always_true
---------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*) + Int64(2) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -800,13 +800,13 @@ explain select t1.t1_int from t1 where (select count(*) 
from t2 where t1.t1_id =
 ----
 logical_plan
 Projection: t1.t1_int
---Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE 
__scalar_sq_1.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64)
-----Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), 
__scalar_sq_1.__always_true
+--Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE 
__scalar_sq_1.COUNT(*) END < CAST(t1.t1_int AS Int64)
+----Projection: t1.t1_int, __scalar_sq_1.COUNT(*), __scalar_sq_1.__always_true
 ------Left Join: t1.t1_id = __scalar_sq_1.t2_id
 --------TableScan: t1 projection=[t1_id, t1_int]
 --------SubqueryAlias: __scalar_sq_1
-----------Projection: COUNT(UInt8(1)), t2.t2_id, __always_true
-------------Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+----------Projection: COUNT(*), t2.t2_id, __always_true
+------------Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_id]
 
 query I rowsort
@@ -826,10 +826,10 @@ Projection: t1.t1_id, __scalar_sq_1.cnt_plus_2 AS 
cnt_plus_2
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int
---------Filter: COUNT(UInt8(1)) > Int64(1)
-----------Projection: t2.t2_int, COUNT(UInt8(1))
-------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*) + Int64(2) AS cnt_plus_2, t2.t2_int
+--------Filter: COUNT(*) > Int64(1)
+----------Projection: t2.t2_int, COUNT(*)
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -846,12 +846,12 @@ query TT
 explain SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE 
t2.t2_int = t1.t1_int having count(*) = 0) from t1
 ----
 logical_plan
-Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(2) AS cnt_plus_2 WHEN __scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL 
ELSE __scalar_sq_1.cnt_plus_2 END AS cnt_plus_2
+Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN 
Int64(2) AS cnt_plus_2 WHEN __scalar_sq_1.COUNT(*) != Int64(0) THEN NULL ELSE 
__scalar_sq_1.cnt_plus_2 END AS cnt_plus_2
 --Left Join: t1.t1_int = __scalar_sq_1.t2_int
 ----TableScan: t1 projection=[t1_id, t1_int]
 ----SubqueryAlias: __scalar_sq_1
-------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int, 
COUNT(UInt8(1)), __always_true
---------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+------Projection: COUNT(*) + Int64(2) AS cnt_plus_2, t2.t2_int, COUNT(*), 
__always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----------TableScan: t2 projection=[t2_int]
 
 query II rowsort
@@ -868,14 +868,14 @@ explain select t1.t1_int from t1 group by t1.t1_int 
having (select count(*) from
 ----
 logical_plan
 Projection: t1.t1_int
---Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE 
__scalar_sq_1.COUNT(UInt8(1)) END = Int64(0)
-----Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), 
__scalar_sq_1.__always_true
+--Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE 
__scalar_sq_1.COUNT(*) END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_1.COUNT(*), __scalar_sq_1.__always_true
 ------Left Join: t1.t1_int = __scalar_sq_1.t2_int
 --------Aggregate: groupBy=[[t1.t1_int]], aggr=[[]]
 ----------TableScan: t1 projection=[t1_int]
 --------SubqueryAlias: __scalar_sq_1
-----------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
-------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+----------Projection: COUNT(*), t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_int]
 
 query I rowsort
@@ -895,8 +895,8 @@ Projection: t1.t1_int
 ------Left Join: t1.t1_int = __scalar_sq_1.t2_int
 --------TableScan: t1 projection=[t1_int]
 --------SubqueryAlias: __scalar_sq_1
-----------Projection: COUNT(UInt8(1)) AS cnt, t2.t2_int, __always_true
-------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+----------Projection: COUNT(*) AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_int]
 
 
@@ -920,13 +920,13 @@ select t1.t1_int from t1 where (
 ----
 logical_plan
 Projection: t1.t1_int
---Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) WHEN 
__scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE 
__scalar_sq_1.cnt_plus_two END = Int64(2)
-----Projection: t1.t1_int, __scalar_sq_1.cnt_plus_two, 
__scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true
+--Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) WHEN 
__scalar_sq_1.COUNT(*) != Int64(0) THEN NULL ELSE __scalar_sq_1.cnt_plus_two 
END = Int64(2)
+----Projection: t1.t1_int, __scalar_sq_1.cnt_plus_two, __scalar_sq_1.COUNT(*), 
__scalar_sq_1.__always_true
 ------Left Join: t1.t1_int = __scalar_sq_1.t2_int
 --------TableScan: t1 projection=[t1_int]
 --------SubqueryAlias: __scalar_sq_1
-----------Projection: COUNT(UInt8(1)) + Int64(1) + Int64(1) AS cnt_plus_two, 
t2.t2_int, COUNT(UInt8(1)), __always_true
-------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+----------Projection: COUNT(*) + Int64(1) + Int64(1) AS cnt_plus_two, 
t2.t2_int, COUNT(*), __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_int]
 
 query I rowsort
@@ -954,8 +954,8 @@ Projection: t1.t1_int
 ------Left Join: t1.t1_int = __scalar_sq_1.t2_int
 --------TableScan: t1 projection=[t1_int]
 --------SubqueryAlias: __scalar_sq_1
-----------Projection: CASE WHEN COUNT(UInt8(1)) = Int64(1) THEN Int64(NULL) 
ELSE COUNT(UInt8(1)) END AS cnt, t2.t2_int, __always_true
-------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1))]]
+----------Projection: CASE WHEN COUNT(*) = Int64(1) THEN Int64(NULL) ELSE 
COUNT(*) END AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], 
aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 --------------TableScan: t2 projection=[t2_int]
 
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part
index 050f14aaed..25485225e7 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q1.slt.part
@@ -41,19 +41,19 @@ explain select
 ----
 logical_plan
 Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS 
LAST
---Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS 
sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS 
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order
-----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice
 AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) 
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), 
SUM(lineitem.l_ex [...]
+--Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS 
sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS 
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
+----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), 
SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice
 AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) 
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), 
SUM(lineitem.l_ex [...]
 ------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
 --------Filter: lineitem.l_shipdate <= Date32("10471")
 ----------TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], 
partial_filters=[lineitem.l_shipdate <= Date32("10471")]
 physical_plan
 SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC 
NULLS LAST]
 --SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
-----ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as 
l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, 
SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice 
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price, 
SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + 
lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, 
AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as 
avg_d [...]
-------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
+----ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as 
l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, 
SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice 
* Int64(1) - lineitem.l_discount)@4 as sum_disc_price, 
SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + 
lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, 
AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as 
avg_d [...]
+------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 
4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, 
l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
+------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, 
l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), 
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
 --------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)Decimal128(Some(1),20,0) - 
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------FilterExec: l_shipdate@6 <= 10471
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
index b65f56c891..890c4da60b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
@@ -42,8 +42,8 @@ limit 10;
 logical_plan
 Limit: skip=0, fetch=10
 --Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST, fetch=10
-----Projection: c_orders.c_count, COUNT(UInt8(1)) AS custdist
-------Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1))]]
+----Projection: c_orders.c_count, COUNT(*) AS custdist
+------Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 --------SubqueryAlias: c_orders
 ----------Projection: COUNT(orders.o_orderkey) AS c_count
 ------------Aggregate: groupBy=[[customer.c_custkey]], 
aggr=[[COUNT(orders.o_orderkey)]]
@@ -57,11 +57,11 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=10
 --SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC], fetch=10
 ----SortExec: fetch=10, expr=[custdist@1 DESC,c_count@0 DESC]
-------ProjectionExec: expr=[c_count@0 as c_count, COUNT(UInt8(1))@1 as 
custdist]
---------AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], 
aggr=[COUNT(UInt8(1))]
+------ProjectionExec: expr=[c_count@0 as c_count, COUNT(*)@1 as custdist]
+--------AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], 
aggr=[COUNT(*)]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([c_count@0], 4), 
input_partitions=4
---------------AggregateExec: mode=Partial, gby=[c_count@0 as c_count], 
aggr=[COUNT(UInt8(1))]
+--------------AggregateExec: mode=Partial, gby=[c_count@0 as c_count], 
aggr=[COUNT(*)]
 ----------------ProjectionExec: expr=[COUNT(orders.o_orderkey)@1 as c_count]
 ------------------AggregateExec: mode=SinglePartitioned, gby=[c_custkey@0 as 
c_custkey], aggr=[COUNT(orders.o_orderkey)]
 --------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, 
o_orderkey@1 as o_orderkey]
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q21.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q21.slt.part
index a43c761d88..24004d187d 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q21.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q21.slt.part
@@ -59,8 +59,8 @@ order by
 ----
 logical_plan
 Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
---Projection: supplier.s_name, COUNT(UInt8(1)) AS numwait
-----Aggregate: groupBy=[[supplier.s_name]], aggr=[[COUNT(UInt8(1))]]
+--Projection: supplier.s_name, COUNT(*) AS numwait
+----Aggregate: groupBy=[[supplier.s_name]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 ------Projection: supplier.s_name
 --------LeftAnti Join: l1.l_orderkey = __correlated_sq_2.l_orderkey Filter: 
__correlated_sq_2.l_suppkey != l1.l_suppkey
 ----------LeftSemi Join: l1.l_orderkey = __correlated_sq_1.l_orderkey Filter: 
__correlated_sq_1.l_suppkey != l1.l_suppkey
@@ -92,11 +92,11 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS 
LAST
 physical_plan
 SortPreservingMergeExec: [numwait@1 DESC,s_name@0 ASC NULLS LAST]
 --SortExec: expr=[numwait@1 DESC,s_name@0 ASC NULLS LAST]
-----ProjectionExec: expr=[s_name@0 as s_name, COUNT(UInt8(1))@1 as numwait]
-------AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name], 
aggr=[COUNT(UInt8(1))]
+----ProjectionExec: expr=[s_name@0 as s_name, COUNT(*)@1 as numwait]
+------AggregateExec: mode=FinalPartitioned, gby=[s_name@0 as s_name], 
aggr=[COUNT(*)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([s_name@0], 4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[s_name@0 as s_name], 
aggr=[COUNT(UInt8(1))]
+------------AggregateExec: mode=Partial, gby=[s_name@0 as s_name], 
aggr=[COUNT(*)]
 --------------ProjectionExec: expr=[s_name@0 as s_name]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(l_orderkey@1, l_orderkey@0)], filter=l_suppkey@1 != l_suppkey@0
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q22.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q22.slt.part
index 3fb1a1d1fd..7ab66d1783 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q22.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q22.slt.part
@@ -57,8 +57,8 @@ order by
 ----
 logical_plan
 Sort: custsale.cntrycode ASC NULLS LAST
---Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, 
SUM(custsale.c_acctbal) AS totacctbal
-----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), 
SUM(custsale.c_acctbal)]]
+--Projection: custsale.cntrycode, COUNT(*) AS numcust, SUM(custsale.c_acctbal) 
AS totacctbal
+----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*), SUM(custsale.c_acctbal)]]
 ------SubqueryAlias: custsale
 --------Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, 
customer.c_acctbal
 ----------Inner Join:  Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > 
__scalar_sq_2.AVG(customer.c_acctbal)
@@ -76,11 +76,11 @@ Sort: custsale.cntrycode ASC NULLS LAST
 physical_plan
 SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST]
 --SortExec: expr=[cntrycode@0 ASC NULLS LAST]
-----ProjectionExec: expr=[cntrycode@0 as cntrycode, COUNT(UInt8(1))@1 as 
numcust, SUM(custsale.c_acctbal)@2 as totacctbal]
-------AggregateExec: mode=FinalPartitioned, gby=[cntrycode@0 as cntrycode], 
aggr=[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]
+----ProjectionExec: expr=[cntrycode@0 as cntrycode, COUNT(*)@1 as numcust, 
SUM(custsale.c_acctbal)@2 as totacctbal]
+------AggregateExec: mode=FinalPartitioned, gby=[cntrycode@0 as cntrycode], 
aggr=[COUNT(*), SUM(custsale.c_acctbal)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([cntrycode@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], 
aggr=[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]
+------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], 
aggr=[COUNT(*), SUM(custsale.c_acctbal)]
 --------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, 
c_acctbal@1 as c_acctbal]
 ----------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 
AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1
 ------------------ProjectionExec: expr=[c_phone@1 as c_phone, c_acctbal@2 as 
c_acctbal]
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q4.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q4.slt.part
index e4660cf904..d609b1488e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q4.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q4.slt.part
@@ -41,8 +41,8 @@ order by
 ----
 logical_plan
 Sort: orders.o_orderpriority ASC NULLS LAST
---Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
-----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
+--Projection: orders.o_orderpriority, COUNT(*) AS order_count
+----Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]]
 ------Projection: orders.o_orderpriority
 --------LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
 ----------Projection: orders.o_orderkey, orders.o_orderpriority
@@ -55,11 +55,11 @@ Sort: orders.o_orderpriority ASC NULLS LAST
 physical_plan
 SortPreservingMergeExec: [o_orderpriority@0 ASC NULLS LAST]
 --SortExec: expr=[o_orderpriority@0 ASC NULLS LAST]
-----ProjectionExec: expr=[o_orderpriority@0 as o_orderpriority, 
COUNT(UInt8(1))@1 as order_count]
-------AggregateExec: mode=FinalPartitioned, gby=[o_orderpriority@0 as 
o_orderpriority], aggr=[COUNT(UInt8(1))]
+----ProjectionExec: expr=[o_orderpriority@0 as o_orderpriority, COUNT(*)@1 as 
order_count]
+------AggregateExec: mode=FinalPartitioned, gby=[o_orderpriority@0 as 
o_orderpriority], aggr=[COUNT(*)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([o_orderpriority@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[o_orderpriority@0 as 
o_orderpriority], aggr=[COUNT(UInt8(1))]
+------------AggregateExec: mode=Partial, gby=[o_orderpriority@0 as 
o_orderpriority], aggr=[COUNT(*)]
 --------------ProjectionExec: expr=[o_orderpriority@1 as o_orderpriority]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(o_orderkey@0, l_orderkey@0)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt 
b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 1f0fa1d456..c8aaa36fbc 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -338,16 +338,16 @@ SELECT count(*) FROM (
 ) GROUP BY name
 ----
 logical_plan
-Projection: COUNT(UInt8(1))
---Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]]
+Projection: COUNT(*)
+--Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----Union
 ------Aggregate: groupBy=[[t1.name]], aggr=[[]]
 --------TableScan: t1 projection=[name]
 ------Aggregate: groupBy=[[t2.name]], aggr=[[]]
 --------TableScan: t2 projection=[name]
 physical_plan
-ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], 
aggr=[COUNT(UInt8(1))]
+ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
+--AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], aggr=[COUNT(*)]
 ----InterleaveExec
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
 --------CoalesceBatchesExec: target_batch_size=8192
@@ -482,8 +482,8 @@ select x, y from (select 1 as x , max(10) as y) b
 ----
 logical_plan
 Union
---Projection: COUNT(UInt8(1)) AS count, a.n
-----Aggregate: groupBy=[[a.n]], aggr=[[COUNT(UInt8(1))]]
+--Projection: COUNT(*) AS count, a.n
+----Aggregate: groupBy=[[a.n]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ------SubqueryAlias: a
 --------Projection: Int64(5) AS n
 ----------EmptyRelation
@@ -494,12 +494,12 @@ Union
 ----------EmptyRelation
 physical_plan
 UnionExec
---ProjectionExec: expr=[COUNT(UInt8(1))@1 as count, n@0 as n]
-----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], 
aggr=[COUNT(UInt8(1))]
+--ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n]
+----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)]
 ------CoalesceBatchesExec: target_batch_size=8192
 --------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=4
 ----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-------------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(UInt8(1))]
+------------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
 --------------ProjectionExec: expr=[5 as n]
 ----------------EmptyExec: produce_one_row=true
 --ProjectionExec: expr=[x@0 as count, y@1 as n]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 444ba73386..cd257aaa92 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -1271,14 +1271,14 @@ EXPLAIN SELECT
     FROM aggregate_test_100
 ----
 logical_plan
-Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
---WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
+--WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----Projection: aggregate_test_100.c1, aggregate_test_100.c2, 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
 ------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 --------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
-ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEE [...]
---BoundedWindowAggExec: wdw=[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(UInt8(1)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), en [...]
+ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRE [...]
+--BoundedWindowAggExec: wdw=[COUNT(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Follo [...]
 ----SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
@@ -1727,31 +1727,29 @@ EXPLAIN SELECT count(*) as global_count FROM
                   ORDER BY c1 ) AS a
 ----
 logical_plan
-Projection: COUNT(UInt8(1)) AS global_count
---Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+Projection: COUNT(*) AS global_count
+--Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----SubqueryAlias: a
 ------Sort: aggregate_test_100.c1 ASC NULLS LAST
---------Projection: aggregate_test_100.c1
-----------Aggregate: groupBy=[[aggregate_test_100.c1]], 
aggr=[[COUNT(UInt8(1))]]
-------------Projection: aggregate_test_100.c1
---------------Filter: aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
-----------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
+----------Projection: aggregate_test_100.c1
+------------Filter: aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
+--------------TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
 physical_plan
-ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]
---AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+ProjectionExec: expr=[COUNT(*)@0 as global_count]
+--AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
 ----CoalescePartitionsExec
-------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2
-----------ProjectionExec: expr=[c1@0 as c1]
-------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]
---------------CoalesceBatchesExec: target_batch_size=4096
-----------------RepartitionExec: partitioning=Hash([c1@0], 2), 
input_partitions=2
-------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]
---------------------ProjectionExec: expr=[c1@0 as c1]
-----------------------CoalesceBatchesExec: target_batch_size=4096
-------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
---------------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c13], has_header=true
+----------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
+------------CoalesceBatchesExec: target_batch_size=4096
+--------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
+----------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
+------------------ProjectionExec: expr=[c1@0 as c1]
+--------------------CoalesceBatchesExec: target_batch_size=4096
+----------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
+------------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+--------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c13], has_header=true
 
 query I
 SELECT count(*) as global_count FROM
@@ -2541,21 +2539,21 @@ logical_plan
 Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, cnt2, 
sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, cntr2, 
sum4, cnt3
 --Limit: skip=0, fetch=5
 ----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
-------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_f [...]
---------WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING 
AND 1 FOLLOWING]]
-----------Projection: annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated [...]
-------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite 
[...]
---------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data [...]
+------Projection: SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_f [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING 
AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+----------Projection: annotated_data_finite.inc_col, 
annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated [...]
+------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite 
[...]
+--------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data [...]
 ----------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 
as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 
as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, 
sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, 
maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, 
cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@24 DESC]
-------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 
1 FOLLOWING: Ok(Field { name: "COUNT(UInt8(1)) [...]
-----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotate [...]
-------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follow [...]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fol [...]
+------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)) }, COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "COUNT(*) ROWS BETWEEN  [...]
+----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotate [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follow [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fol [...]
 ----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 95061e3854..ce798781c6 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -15,23 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::analyzer::AnalyzerRule;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
-use datafusion_common::{Column, DFField, DFSchema, DFSchemaRef, Result};
-use datafusion_expr::expr::{AggregateFunction, Alias, InSubquery};
+use datafusion_common::Result;
+use datafusion_expr::expr::{AggregateFunction, InSubquery};
+use datafusion_expr::expr_rewriter::rewrite_preserving_name;
 use datafusion_expr::utils::COUNT_STAR_EXPANSION;
 use datafusion_expr::Expr::ScalarSubquery;
 use datafusion_expr::{
-    aggregate_function, count, expr, lit, window_function, Aggregate, Expr, 
Filter,
-    LogicalPlan, Projection, Sort, Subquery, Window,
+    aggregate_function, expr, lit, window_function, Aggregate, Expr, Filter, 
LogicalPlan,
+    LogicalPlanBuilder, Projection, Sort, Subquery,
 };
-use std::string::ToString;
 use std::sync::Arc;
 
-use crate::analyzer::AnalyzerRule;
-
-pub const COUNT_STAR: &str = "COUNT(*)";
-
 /// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`.
 ///
 /// Resolves issue: <https://github.com/apache/arrow-datafusion/issues/5473>
@@ -61,35 +58,30 @@ fn analyze_internal(plan: LogicalPlan) -> 
Result<Transformed<LogicalPlan>> {
             let window_expr = window
                 .window_expr
                 .iter()
-                .map(|expr| expr.clone().rewrite(&mut rewriter))
+                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
                 .collect::<Result<Vec<_>>>()?;
 
-            Ok(Transformed::Yes(LogicalPlan::Window(Window {
-                input: window.input.clone(),
-                window_expr,
-                schema: rewrite_schema(&window.schema),
-            })))
+            Ok(Transformed::Yes(
+                LogicalPlanBuilder::from((*window.input).clone())
+                    .window(window_expr)?
+                    .build()?,
+            ))
         }
         LogicalPlan::Aggregate(agg) => {
             let aggr_expr = agg
                 .aggr_expr
                 .iter()
-                .map(|expr| expr.clone().rewrite(&mut rewriter))
+                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
                 .collect::<Result<Vec<_>>>()?;
 
             Ok(Transformed::Yes(LogicalPlan::Aggregate(
-                Aggregate::try_new_with_schema(
-                    agg.input.clone(),
-                    agg.group_expr.clone(),
-                    aggr_expr,
-                    rewrite_schema(&agg.schema),
-                )?,
+                Aggregate::try_new(agg.input.clone(), agg.group_expr.clone(), 
aggr_expr)?,
             )))
         }
         LogicalPlan::Sort(Sort { expr, input, fetch }) => {
             let sort_expr = expr
                 .iter()
-                .map(|expr| expr.clone().rewrite(&mut rewriter))
+                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
                 .collect::<Result<Vec<_>>>()?;
             Ok(Transformed::Yes(LogicalPlan::Sort(Sort {
                 expr: sort_expr,
@@ -101,21 +93,16 @@ fn analyze_internal(plan: LogicalPlan) -> 
Result<Transformed<LogicalPlan>> {
             let projection_expr = projection
                 .expr
                 .iter()
-                .map(|expr| expr.clone().rewrite(&mut rewriter))
+                .map(|expr| rewrite_preserving_name(expr.clone(), &mut 
rewriter))
                 .collect::<Result<Vec<_>>>()?;
             Ok(Transformed::Yes(LogicalPlan::Projection(
-                Projection::try_new_with_schema(
-                    projection_expr,
-                    projection.input,
-                    // rewrite_schema(projection.schema.clone()),
-                    rewrite_schema(&projection.schema),
-                )?,
+                Projection::try_new(projection_expr, projection.input)?,
             )))
         }
         LogicalPlan::Filter(Filter {
             predicate, input, ..
         }) => {
-            let predicate = predicate.rewrite(&mut rewriter)?;
+            let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
             Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
                 predicate, input,
             )?)))
@@ -132,24 +119,6 @@ impl TreeNodeRewriter for CountWildcardRewriter {
 
     fn mutate(&mut self, old_expr: Expr) -> Result<Expr> {
         let new_expr = match old_expr.clone() {
-            Expr::Alias(Alias { expr, name, .. }) if name.contains(COUNT_STAR) 
=> {
-                Expr::Alias(Alias::new(
-                    *expr,
-                    name.replace(
-                        COUNT_STAR,
-                        count(lit(COUNT_STAR_EXPANSION)).to_string().as_str(),
-                    ),
-                ))
-            }
-            Expr::Column(Column { name, relation }) if 
name.contains(COUNT_STAR) => {
-                Expr::Column(Column {
-                    name: name.replace(
-                        COUNT_STAR,
-                        count(lit(COUNT_STAR_EXPANSION)).to_string().as_str(),
-                    ),
-                    relation: relation.clone(),
-                })
-            }
             Expr::WindowFunction(expr::WindowFunction {
                 fun:
                     window_function::WindowFunction::AggregateFunction(
@@ -242,32 +211,6 @@ impl TreeNodeRewriter for CountWildcardRewriter {
         Ok(new_expr)
     }
 }
-fn rewrite_schema(schema: &DFSchema) -> DFSchemaRef {
-    let new_fields = schema
-        .fields()
-        .iter()
-        .map(|field| {
-            let mut name = field.field().name().clone();
-            if name.contains(COUNT_STAR) {
-                name = name.replace(
-                    COUNT_STAR,
-                    count(lit(COUNT_STAR_EXPANSION)).to_string().as_str(),
-                );
-            }
-            DFField::new(
-                field.qualifier().cloned(),
-                &name,
-                field.data_type().clone(),
-                field.is_nullable(),
-            )
-        })
-        .collect::<Vec<DFField>>();
-    Arc::new(
-        DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
-            .unwrap()
-            
.with_functional_dependencies(schema.functional_dependencies().clone()),
-    )
-}
 
 #[cfg(test)]
 mod tests {
@@ -298,10 +241,10 @@ mod tests {
             .project(vec![count(Expr::Wildcard)])?
             .sort(vec![count(Expr::Wildcard).sort(true, false)])?
             .build()?;
-        let expected = "Sort: COUNT(UInt8(1)) ASC NULLS LAST 
[COUNT(UInt8(1)):Int64;N]\
-          \n  Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-          \n    Aggregate: groupBy=[[test.b]], aggr=[[COUNT(UInt8(1))]] 
[b:UInt32, COUNT(UInt8(1)):Int64;N]\
-          \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        let expected = "Sort: COUNT(*) ASC NULLS LAST [COUNT(*):Int64;N]\
+        \n  Projection: COUNT(*) [COUNT(*):Int64;N]\
+        \n    Aggregate: groupBy=[[test.b]], aggr=[[COUNT(UInt8(1)) AS 
COUNT(*)]] [b:UInt32, COUNT(*):Int64;N]\
+        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 
@@ -323,11 +266,11 @@ mod tests {
             .build()?;
 
         let expected = "Filter: t1.a IN (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
-              \n  Subquery: [COUNT(UInt8(1)):Int64;N]\
-              \n    Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-              \n      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] 
[COUNT(UInt8(1)):Int64;N]\
-              \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
-              \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
+        \n  Subquery: [COUNT(*):Int64;N]\
+        \n    Projection: COUNT(*) [COUNT(*):Int64;N]\
+        \n      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] 
[COUNT(*):Int64;N]\
+        \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
+        \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 
@@ -346,11 +289,11 @@ mod tests {
             .build()?;
 
         let expected = "Filter: EXISTS (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
-          \n  Subquery: [COUNT(UInt8(1)):Int64;N]\
-          \n    Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-          \n      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] 
[COUNT(UInt8(1)):Int64;N]\
-          \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
-          \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
+        \n  Subquery: [COUNT(*):Int64;N]\
+        \n    Projection: COUNT(*) [COUNT(*):Int64;N]\
+        \n      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] 
[COUNT(*):Int64;N]\
+        \n        TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]\
+        \n  TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 
@@ -407,9 +350,9 @@ mod tests {
             .project(vec![count(Expr::Wildcard)])?
             .build()?;
 
-        let expected = "Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-              \n  WindowAggr: windowExpr=[[COUNT(UInt8(1)) ORDER BY [test.a 
DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] [a:UInt32, 
b:UInt32, c:UInt32, COUNT(UInt8(1)) ORDER BY [test.a DESC NULLS FIRST] RANGE 
BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64;N]\
-              \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        let expected = "Projection: COUNT(UInt8(1)) AS COUNT(*) 
[COUNT(*):Int64;N]\
+        \n  WindowAggr: windowExpr=[[COUNT(UInt8(1)) ORDER BY [test.a DESC 
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS COUNT(*) ORDER BY 
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] 
[a:UInt32, b:UInt32, c:UInt32, COUNT(*) ORDER BY [test.a DESC NULLS FIRST] 
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64;N]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 
@@ -421,9 +364,9 @@ mod tests {
             .project(vec![count(Expr::Wildcard)])?
             .build()?;
 
-        let expected = "Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-              \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] 
[COUNT(UInt8(1)):Int64;N]\
-              \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        let expected = "Projection: COUNT(*) [COUNT(*):Int64;N]\
+        \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] 
[COUNT(*):Int64;N]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 
@@ -435,9 +378,9 @@ mod tests {
             .project(vec![count(Expr::Wildcard)])?
             .build()?;
 
-        let expected = "Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]\
-          \n  Aggregate: groupBy=[[]], aggr=[[MAX(COUNT(UInt8(1)))]] 
[MAX(COUNT(UInt8(1))):Int64;N]\
-          \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+        let expected = "Projection: COUNT(UInt8(1)) AS COUNT(*) 
[COUNT(*):Int64;N]\
+        \n  Aggregate: groupBy=[[]], aggr=[[MAX(COUNT(UInt8(1))) AS 
MAX(COUNT(*))]] [MAX(COUNT(*)):Int64;N]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(&plan, expected)
     }
 }
diff --git a/datafusion/optimizer/src/decorrelate.rs 
b/datafusion/optimizer/src/decorrelate.rs
index f335592eb3..8cd90072ff 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -376,10 +376,7 @@ fn agg_exprs_evaluation_result_on_empty_batch(
     for e in agg_expr.iter() {
         let result_expr = e.clone().transform_up(&|expr| {
             let new_expr = match expr {
-                
Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction {
-                    fun,
-                    ..
-                }) => {
+                Expr::AggregateFunction(expr::AggregateFunction { fun, .. }) 
=> {
                     if matches!(fun, 
datafusion_expr::AggregateFunction::Count) {
                         
Transformed::Yes(Expr::Literal(ScalarValue::Int64(Some(0))))
                     } else {
@@ -394,6 +391,7 @@ fn agg_exprs_evaluation_result_on_empty_batch(
             Ok(new_expr)
         })?;
 
+        let result_expr = result_expr.unalias();
         let props = ExecutionProps::new();
         let info = SimplifyContext::new(&props).with_schema(schema.clone());
         let simplifier = ExprSimplifier::new(info);
diff --git a/datafusion/optimizer/src/eliminate_project.rs 
b/datafusion/optimizer/src/eliminate_project.rs
index 5c43b8d12c..d3226eaa78 100644
--- a/datafusion/optimizer/src/eliminate_project.rs
+++ b/datafusion/optimizer/src/eliminate_project.rs
@@ -42,9 +42,7 @@ impl OptimizerRule for EliminateProjection {
             LogicalPlan::Projection(projection) => {
                 let child_plan = projection.input.as_ref();
                 match child_plan {
-                    LogicalPlan::Join(_)
-                    | LogicalPlan::CrossJoin(_)
-                    | LogicalPlan::Union(_)
+                    LogicalPlan::Union(_)
                     | LogicalPlan::Filter(_)
                     | LogicalPlan::TableScan(_)
                     | LogicalPlan::SubqueryAlias(_)
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index e2e10428ad..8027e6882e 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -310,8 +310,8 @@ fn join_keys_in_subquery_alias_1() {
 fn push_down_filter_groupby_expr_contains_alias() {
     let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) 
FROM test GROUP BY 1) where c > 3";
     let plan = test_sql(sql).unwrap();
-    let expected = "Projection: test.col_int32 + test.col_uint32 AS c, 
COUNT(UInt8(1))\
-    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS 
Int32)]], aggr=[[COUNT(UInt8(1))]]\
+    let expected = "Projection: test.col_int32 + test.col_uint32 AS c, 
COUNT(*)\
+    \n  Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS 
Int32)]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
     \n    Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\
     \n      TableScan: test projection=[col_int32, col_uint32]";
     assert_eq!(expected, format!("{plan:?}"));

Reply via email to