This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new abb2ae76d9 Revert "Minor: remove unnecessary projection in 
`single_distinct_to_g… (#8176)
abb2ae76d9 is described below

commit abb2ae76d963ee50b08730928302aa9e20031919
Author: Nga Tran <[email protected]>
AuthorDate: Tue Nov 14 16:27:31 2023 -0500

    Revert "Minor: remove unnecessary projection in `single_distinct_to_g… 
(#8176)
    
    * Revert "Minor: remove unnecessary projection in 
`single_distinct_to_group_by` rule (#8061)"
    
    This reverts commit 15d8c9bf48a56ae9de34d18becab13fd1942dc4a.
    
    * Add regression test
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../optimizer/src/single_distinct_to_groupby.rs    | 95 ++++++++++++++++------
 datafusion/sqllogictest/test_files/groupby.slt     | 33 ++++++--
 datafusion/sqllogictest/test_files/joins.slt       | 47 ++++++-----
 .../sqllogictest/test_files/tpch/q16.slt.part      | 10 +--
 4 files changed, 130 insertions(+), 55 deletions(-)

diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs 
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 414217612d..be76c069f0 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -22,12 +22,13 @@ use std::sync::Arc;
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
 
-use datafusion_common::Result;
+use datafusion_common::{DFSchema, Result};
 use datafusion_expr::{
     col,
     expr::AggregateFunction,
-    logical_plan::{Aggregate, LogicalPlan},
-    Expr,
+    logical_plan::{Aggregate, LogicalPlan, Projection},
+    utils::columnize_expr,
+    Expr, ExprSchemable,
 };
 
 use hashbrown::HashSet;
@@ -152,7 +153,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
 
                     // replace the distinct arg with alias
                     let mut group_fields_set = HashSet::new();
-                    let outer_aggr_exprs = aggr_expr
+                    let new_aggr_exprs = aggr_expr
                         .iter()
                         .map(|aggr_expr| match aggr_expr {
                             Expr::AggregateFunction(AggregateFunction {
@@ -174,24 +175,67 @@ impl OptimizerRule for SingleDistinctToGroupBy {
                                     false, // intentional to remove distinct 
here
                                     filter.clone(),
                                     order_by.clone(),
-                                ))
-                                .alias(aggr_expr.display_name()?))
+                                )))
                             }
                             _ => Ok(aggr_expr.clone()),
                         })
                         .collect::<Result<Vec<_>>>()?;
 
                     // construct the inner AggrPlan
+                    let inner_fields = inner_group_exprs
+                        .iter()
+                        .map(|expr| expr.to_field(input.schema()))
+                        .collect::<Result<Vec<_>>>()?;
+                    let inner_schema = DFSchema::new_with_metadata(
+                        inner_fields,
+                        input.schema().metadata().clone(),
+                    )?;
                     let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
                         input.clone(),
                         inner_group_exprs,
                         Vec::new(),
                     )?);
 
-                    Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
+                    let outer_fields = outer_group_exprs
+                        .iter()
+                        .chain(new_aggr_exprs.iter())
+                        .map(|expr| expr.to_field(&inner_schema))
+                        .collect::<Result<Vec<_>>>()?;
+                    let outer_aggr_schema = 
Arc::new(DFSchema::new_with_metadata(
+                        outer_fields,
+                        input.schema().metadata().clone(),
+                    )?);
+
+                    // so the aggregates are displayed in the same way even 
after the rewrite
+                    // this optimizer has two kinds of alias:
+                    // - group_by aggr
+                    // - aggr expr
+                    let group_size = group_expr.len();
+                    let alias_expr = out_group_expr_with_alias
+                        .into_iter()
+                        .map(|(group_expr, original_field)| {
+                            if let Some(name) = original_field {
+                                group_expr.alias(name)
+                            } else {
+                                group_expr
+                            }
+                        })
+                        .chain(new_aggr_exprs.iter().enumerate().map(|(idx, 
expr)| {
+                            let idx = idx + group_size;
+                            let name = fields[idx].qualified_name();
+                            columnize_expr(expr.clone().alias(name), 
&outer_aggr_schema)
+                        }))
+                        .collect();
+
+                    let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
                         Arc::new(inner_agg),
                         outer_group_exprs,
-                        outer_aggr_exprs,
+                        new_aggr_exprs,
+                    )?);
+
+                    Ok(Some(LogicalPlan::Projection(Projection::try_new(
+                        alias_expr,
+                        Arc::new(outer_aggr),
                     )?)))
                 } else {
                     Ok(None)
@@ -255,9 +299,10 @@ mod tests {
             .build()?;
 
         // Should work
-        let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS 
COUNT(DISTINCT test.b)]] [COUNT(DISTINCT test.b):Int64;N]\
-                            \n  Aggregate: groupBy=[[test.b AS alias1]], 
aggr=[[]] [alias1:UInt32]\
-                            \n    TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
+        let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT test.b) 
[COUNT(DISTINCT test.b):Int64;N]\
+                            \n  Aggregate: groupBy=[[]], 
aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
+                            \n    Aggregate: groupBy=[[test.b AS alias1]], 
aggr=[[]] [alias1:UInt32]\
+                            \n      TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
 
         assert_optimized_plan_equal(&plan, expected)
     }
@@ -328,9 +373,10 @@ mod tests {
             .aggregate(Vec::<Expr>::new(), vec![count_distinct(lit(2) * 
col("b"))])?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS 
COUNT(DISTINCT Int32(2) * test.b)]] [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
-                            \n  Aggregate: groupBy=[[Int32(2) * test.b AS 
alias1]], aggr=[[]] [alias1:Int32]\
-                            \n    TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
+        let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT Int32(2) * 
test.b) [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
+                            \n  Aggregate: groupBy=[[]], 
aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
+                            \n    Aggregate: groupBy=[[Int32(2) * test.b AS 
alias1]], aggr=[[]] [alias1:Int32]\
+                            \n      TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
 
         assert_optimized_plan_equal(&plan, expected)
     }
@@ -344,9 +390,10 @@ mod tests {
             .build()?;
 
         // Should work
-        let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS 
COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
-                            \n  Aggregate: groupBy=[[test.a, test.b AS 
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
-                            \n    TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
+        let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT 
test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
+                            \n  Aggregate: groupBy=[[test.a]], 
aggr=[[COUNT(alias1)]] [a:UInt32, COUNT(alias1):Int64;N]\
+                            \n    Aggregate: groupBy=[[test.a, test.b AS 
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
+                            \n      TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
 
         assert_optimized_plan_equal(&plan, expected)
     }
@@ -389,9 +436,10 @@ mod tests {
             )?
             .build()?;
         // Should work
-        let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS 
COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, 
COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
-                            \n  Aggregate: groupBy=[[test.a, test.b AS 
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
-                            \n    TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
+        let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT 
test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT 
test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
+                            \n  Aggregate: groupBy=[[test.a]], 
aggr=[[COUNT(alias1), MAX(alias1)]] [a:UInt32, COUNT(alias1):Int64;N, 
MAX(alias1):UInt32;N]\
+                            \n    Aggregate: groupBy=[[test.a, test.b AS 
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
+                            \n      TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
 
         assert_optimized_plan_equal(&plan, expected)
     }
@@ -423,9 +471,10 @@ mod tests {
             .build()?;
 
         // Should work
-        let expected = "Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.c)]] [group_alias_0:Int32, 
COUNT(DISTINCT test.c):Int64;N]\
-                            \n  Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
-                            \n    TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
+        let expected = "Projection: group_alias_0 AS test.a + Int32(1), 
COUNT(alias1) AS COUNT(DISTINCT test.c) [test.a + Int32(1):Int32, 
COUNT(DISTINCT test.c):Int64;N]\
+                            \n  Aggregate: groupBy=[[group_alias_0]], 
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+                            \n    Aggregate: groupBy=[[test.a + Int32(1) AS 
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, 
alias1:UInt32]\
+                            \n      TableScan: test [a:UInt32, b:UInt32, 
c:UInt32]";
 
         assert_optimized_plan_equal(&plan, expected)
     }
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index 105f11f216..300e92a735 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3823,17 +3823,17 @@ query TT
 EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS 
DOUBLE)) FROM t1 GROUP BY y;
 ----
 logical_plan
-Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
---Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1) AS SUM(DISTINCT t1.x), 
MAX(alias1) AS MAX(DISTINCT t1.x)]]
+Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT 
t1.x)
+--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
 ----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], 
aggr=[[]]
 ------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y
 --------TableScan: t1 projection=[x, y]
 physical_plan
-ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT 
t1.x)@2 as MAX(DISTINCT t1.x)]
---AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT 
t1.x), MAX(DISTINCT t1.x)]
+ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as 
MAX(DISTINCT t1.x)]
+--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), 
MAX(alias1)]
 ----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), 
MAX(DISTINCT t1.x)]
+--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), 
MAX(alias1)]
 ----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as 
alias1], aggr=[]
 ------------CoalesceBatchesExec: target_batch_size=2
 --------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), 
input_partitions=8
@@ -3841,3 +3841,26 @@ ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as 
SUM(DISTINCT t1.x), MAX(DISTINCT t
 ------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS 
Float64)t1.x@0 as alias1], aggr=[]
 --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS 
Float64)t1.x, y@1 as y]
 ----------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table t1
+
+# Reproducer for https://github.com/apache/arrow-datafusion/issues/8175
+
+statement ok
+create table t1(state string, city string, min_temp float, area int, time 
timestamp) as values
+    ('MA', 'Boston', 70.4, 1, 50),
+    ('MA', 'Bedford', 71.59, 2, 150);
+
+query RI
+select date_part('year', time) as bla, count(distinct state) as count from t1 
group by bla;
+----
+1970 1
+
+query PI
+select date_bin(interval '1 year', time) as bla, count(distinct state) as 
count from t1 group by bla;
+----
+1970-01-01T00:00:00 1
+
+statement ok
+drop table t1
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 24893297f1..fa3a6cff8c 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1361,29 +1361,31 @@ from join_t1
 inner join join_t2 on join_t1.t1_id = join_t2.t2_id
 ----
 logical_plan
-Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT 
join_t1.t1_id)]]
---Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
-----Projection: join_t1.t1_id
-------Inner Join: join_t1.t1_id = join_t2.t2_id
---------TableScan: join_t1 projection=[t1_id]
---------TableScan: join_t2 projection=[t2_id]
+Projection: COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id)
+--Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]
+----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
+------Projection: join_t1.t1_id
+--------Inner Join: join_t1.t1_id = join_t2.t2_id
+----------TableScan: join_t1 projection=[t1_id]
+----------TableScan: join_t2 projection=[t2_id]
 physical_plan
-AggregateExec: mode=Final, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
---CoalescePartitionsExec
-----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
-------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
---------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
-----------ProjectionExec: expr=[t1_id@0 as t1_id]
-------------CoalesceBatchesExec: target_batch_size=2
---------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
-----------------CoalesceBatchesExec: target_batch_size=2
-------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
---------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------------CoalesceBatchesExec: target_batch_size=2
-------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
---------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------------MemoryExec: partitions=1, partition_sizes=[1]
+ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT join_t1.t1_id)]
+--AggregateExec: mode=Final, gby=[], aggr=[COUNT(alias1)]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(alias1)]
+--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
+----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
+------------ProjectionExec: expr=[t1_id@0 as t1_id]
+--------------CoalesceBatchesExec: target_batch_size=2
+----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
+------------------CoalesceBatchesExec: target_batch_size=2
+--------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
+----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[1]
+------------------CoalesceBatchesExec: target_batch_size=2
+--------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
+----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.explain.logical_plan_only = true;
@@ -3407,3 +3409,4 @@ set datafusion.optimizer.prefer_existing_sort = false;
 
 statement ok
 drop table annotated_data;
+
diff --git a/datafusion/sqllogictest/test_files/tpch/q16.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q16.slt.part
index c047829589..b93872929f 100644
--- a/datafusion/sqllogictest/test_files/tpch/q16.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q16.slt.part
@@ -52,8 +52,8 @@ limit 10;
 logical_plan
 Limit: skip=0, fetch=10
 --Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, 
part.p_type ASC NULLS LAST, part.p_size ASC NULLS LAST, fetch=10
-----Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT 
partsupp.ps_suppkey) AS supplier_cnt
-------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], 
aggr=[[COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)]]
+----Projection: part.p_brand, part.p_type, part.p_size, COUNT(alias1) AS 
supplier_cnt
+------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], 
aggr=[[COUNT(alias1)]]
 --------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size, 
partsupp.ps_suppkey AS alias1]], aggr=[[]]
 ----------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
 ------------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, 
part.p_size
@@ -69,11 +69,11 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=10
 --SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS 
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10
 ----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS 
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
-------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 
as p_size, COUNT(DISTINCT partsupp.ps_suppkey)@3 as supplier_cnt]
---------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, 
p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT 
partsupp.ps_suppkey)]
+------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 
as p_size, COUNT(alias1)@3 as supplier_cnt]
+--------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, 
p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, 
p_size@2], 4), input_partitions=4
---------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 
as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)]
+--------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 
as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
 ----------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as 
p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
 ------------------CoalesceBatchesExec: target_batch_size=8192
 --------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, 
p_size@2, alias1@3], 4), input_partitions=4

Reply via email to