This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new abb2ae76d9 Revert "Minor: remove unnecessary projection in
`single_distinct_to_g… (#8176)
abb2ae76d9 is described below
commit abb2ae76d963ee50b08730928302aa9e20031919
Author: Nga Tran <[email protected]>
AuthorDate: Tue Nov 14 16:27:31 2023 -0500
Revert "Minor: remove unnecessary projection in `single_distinct_to_g…
(#8176)
* Revert "Minor: remove unnecessary projection in
`single_distinct_to_group_by` rule (#8061)"
This reverts commit 15d8c9bf48a56ae9de34d18becab13fd1942dc4a.
* Add regression test
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../optimizer/src/single_distinct_to_groupby.rs | 95 ++++++++++++++++------
datafusion/sqllogictest/test_files/groupby.slt | 33 ++++++--
datafusion/sqllogictest/test_files/joins.slt | 47 ++++++-----
.../sqllogictest/test_files/tpch/q16.slt.part | 10 +--
4 files changed, 130 insertions(+), 55 deletions(-)
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 414217612d..be76c069f0 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -22,12 +22,13 @@ use std::sync::Arc;
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::Result;
+use datafusion_common::{DFSchema, Result};
use datafusion_expr::{
col,
expr::AggregateFunction,
- logical_plan::{Aggregate, LogicalPlan},
- Expr,
+ logical_plan::{Aggregate, LogicalPlan, Projection},
+ utils::columnize_expr,
+ Expr, ExprSchemable,
};
use hashbrown::HashSet;
@@ -152,7 +153,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
// replace the distinct arg with alias
let mut group_fields_set = HashSet::new();
- let outer_aggr_exprs = aggr_expr
+ let new_aggr_exprs = aggr_expr
.iter()
.map(|aggr_expr| match aggr_expr {
Expr::AggregateFunction(AggregateFunction {
@@ -174,24 +175,67 @@ impl OptimizerRule for SingleDistinctToGroupBy {
false, // intentional to remove distinct
here
filter.clone(),
order_by.clone(),
- ))
- .alias(aggr_expr.display_name()?))
+ )))
}
_ => Ok(aggr_expr.clone()),
})
.collect::<Result<Vec<_>>>()?;
// construct the inner AggrPlan
+ let inner_fields = inner_group_exprs
+ .iter()
+ .map(|expr| expr.to_field(input.schema()))
+ .collect::<Result<Vec<_>>>()?;
+ let inner_schema = DFSchema::new_with_metadata(
+ inner_fields,
+ input.schema().metadata().clone(),
+ )?;
let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
inner_group_exprs,
Vec::new(),
)?);
- Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
+ let outer_fields = outer_group_exprs
+ .iter()
+ .chain(new_aggr_exprs.iter())
+ .map(|expr| expr.to_field(&inner_schema))
+ .collect::<Result<Vec<_>>>()?;
+ let outer_aggr_schema =
Arc::new(DFSchema::new_with_metadata(
+ outer_fields,
+ input.schema().metadata().clone(),
+ )?);
+
+ // so the aggregates are displayed in the same way even
after the rewrite
+ // this optimizer has two kinds of alias:
+ // - group_by aggr
+ // - aggr expr
+ let group_size = group_expr.len();
+ let alias_expr = out_group_expr_with_alias
+ .into_iter()
+ .map(|(group_expr, original_field)| {
+ if let Some(name) = original_field {
+ group_expr.alias(name)
+ } else {
+ group_expr
+ }
+ })
+ .chain(new_aggr_exprs.iter().enumerate().map(|(idx,
expr)| {
+ let idx = idx + group_size;
+ let name = fields[idx].qualified_name();
+ columnize_expr(expr.clone().alias(name),
&outer_aggr_schema)
+ }))
+ .collect();
+
+ let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(inner_agg),
outer_group_exprs,
- outer_aggr_exprs,
+ new_aggr_exprs,
+ )?);
+
+ Ok(Some(LogicalPlan::Projection(Projection::try_new(
+ alias_expr,
+ Arc::new(outer_aggr),
)?)))
} else {
Ok(None)
@@ -255,9 +299,10 @@ mod tests {
.build()?;
// Should work
- let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS
COUNT(DISTINCT test.b)]] [COUNT(DISTINCT test.b):Int64;N]\
- \n Aggregate: groupBy=[[test.b AS alias1]],
aggr=[[]] [alias1:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
+ let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT test.b)
[COUNT(DISTINCT test.b):Int64;N]\
+ \n Aggregate: groupBy=[[]],
aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
+ \n Aggregate: groupBy=[[test.b AS alias1]],
aggr=[[]] [alias1:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
assert_optimized_plan_equal(&plan, expected)
}
@@ -328,9 +373,10 @@ mod tests {
.aggregate(Vec::<Expr>::new(), vec![count_distinct(lit(2) *
col("b"))])?
.build()?;
- let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS
COUNT(DISTINCT Int32(2) * test.b)]] [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
- \n Aggregate: groupBy=[[Int32(2) * test.b AS
alias1]], aggr=[[]] [alias1:Int32]\
- \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
+ let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT Int32(2) *
test.b) [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
+ \n Aggregate: groupBy=[[]],
aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
+ \n Aggregate: groupBy=[[Int32(2) * test.b AS
alias1]], aggr=[[]] [alias1:Int32]\
+ \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
assert_optimized_plan_equal(&plan, expected)
}
@@ -344,9 +390,10 @@ mod tests {
.build()?;
// Should work
- let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS
COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
- \n Aggregate: groupBy=[[test.a, test.b AS
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
+ let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT
test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
+ \n Aggregate: groupBy=[[test.a]],
aggr=[[COUNT(alias1)]] [a:UInt32, COUNT(alias1):Int64;N]\
+ \n Aggregate: groupBy=[[test.a, test.b AS
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
assert_optimized_plan_equal(&plan, expected)
}
@@ -389,9 +436,10 @@ mod tests {
)?
.build()?;
// Should work
- let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS
COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32,
COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
- \n Aggregate: groupBy=[[test.a, test.b AS
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
+ let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT
test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT
test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
+ \n Aggregate: groupBy=[[test.a]],
aggr=[[COUNT(alias1), MAX(alias1)]] [a:UInt32, COUNT(alias1):Int64;N,
MAX(alias1):UInt32;N]\
+ \n Aggregate: groupBy=[[test.a, test.b AS
alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
assert_optimized_plan_equal(&plan, expected)
}
@@ -423,9 +471,10 @@ mod tests {
.build()?;
// Should work
- let expected = "Aggregate: groupBy=[[group_alias_0]],
aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.c)]] [group_alias_0:Int32,
COUNT(DISTINCT test.c):Int64;N]\
- \n Aggregate: groupBy=[[test.a + Int32(1) AS
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32,
alias1:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
+ let expected = "Projection: group_alias_0 AS test.a + Int32(1),
COUNT(alias1) AS COUNT(DISTINCT test.c) [test.a + Int32(1):Int32,
COUNT(DISTINCT test.c):Int64;N]\
+ \n Aggregate: groupBy=[[group_alias_0]],
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+ \n Aggregate: groupBy=[[test.a + Int32(1) AS
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32,
alias1:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32,
c:UInt32]";
assert_optimized_plan_equal(&plan, expected)
}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index 105f11f216..300e92a735 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3823,17 +3823,17 @@ query TT
EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS
DOUBLE)) FROM t1 GROUP BY y;
----
logical_plan
-Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
---Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1) AS SUM(DISTINCT t1.x),
MAX(alias1) AS MAX(DISTINCT t1.x)]]
+Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT
t1.x)
+--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]],
aggr=[[]]
------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y
--------TableScan: t1 projection=[x, y]
physical_plan
-ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT
t1.x)@2 as MAX(DISTINCT t1.x)]
---AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT
t1.x), MAX(DISTINCT t1.x)]
+ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as
MAX(DISTINCT t1.x)]
+--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1),
MAX(alias1)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x),
MAX(DISTINCT t1.x)]
+--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1),
MAX(alias1)]
----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as
alias1], aggr=[]
------------CoalesceBatchesExec: target_batch_size=2
--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8),
input_partitions=8
@@ -3841,3 +3841,26 @@ ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as
SUM(DISTINCT t1.x), MAX(DISTINCT t
------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS
Float64)t1.x@0 as alias1], aggr=[]
--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS
Float64)t1.x, y@1 as y]
----------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table t1
+
+# Reproducer for https://github.com/apache/arrow-datafusion/issues/8175
+
+statement ok
+create table t1(state string, city string, min_temp float, area int, time
timestamp) as values
+ ('MA', 'Boston', 70.4, 1, 50),
+ ('MA', 'Bedford', 71.59, 2, 150);
+
+query RI
+select date_part('year', time) as bla, count(distinct state) as count from t1
group by bla;
+----
+1970 1
+
+query PI
+select date_bin(interval '1 year', time) as bla, count(distinct state) as
count from t1 group by bla;
+----
+1970-01-01T00:00:00 1
+
+statement ok
+drop table t1
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 24893297f1..fa3a6cff8c 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1361,29 +1361,31 @@ from join_t1
inner join join_t2 on join_t1.t1_id = join_t2.t2_id
----
logical_plan
-Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT
join_t1.t1_id)]]
---Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
-----Projection: join_t1.t1_id
-------Inner Join: join_t1.t1_id = join_t2.t2_id
---------TableScan: join_t1 projection=[t1_id]
---------TableScan: join_t2 projection=[t2_id]
+Projection: COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id)
+--Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]
+----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
+------Projection: join_t1.t1_id
+--------Inner Join: join_t1.t1_id = join_t2.t2_id
+----------TableScan: join_t1 projection=[t1_id]
+----------TableScan: join_t2 projection=[t2_id]
physical_plan
-AggregateExec: mode=Final, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
---CoalescePartitionsExec
-----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
-------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
---------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
-----------ProjectionExec: expr=[t1_id@0 as t1_id]
-------------CoalesceBatchesExec: target_batch_size=2
---------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)]
-----------------CoalesceBatchesExec: target_batch_size=2
-------------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
---------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------------CoalesceBatchesExec: target_batch_size=2
-------------------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
---------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------------MemoryExec: partitions=1, partition_sizes=[1]
+ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT join_t1.t1_id)]
+--AggregateExec: mode=Final, gby=[], aggr=[COUNT(alias1)]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(alias1)]
+--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
+----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
+------------ProjectionExec: expr=[t1_id@0 as t1_id]
+--------------CoalesceBatchesExec: target_batch_size=2
+----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)]
+------------------CoalesceBatchesExec: target_batch_size=2
+--------------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
+----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[1]
+------------------CoalesceBatchesExec: target_batch_size=2
+--------------------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
+----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.explain.logical_plan_only = true;
@@ -3407,3 +3409,4 @@ set datafusion.optimizer.prefer_existing_sort = false;
statement ok
drop table annotated_data;
+
diff --git a/datafusion/sqllogictest/test_files/tpch/q16.slt.part
b/datafusion/sqllogictest/test_files/tpch/q16.slt.part
index c047829589..b93872929f 100644
--- a/datafusion/sqllogictest/test_files/tpch/q16.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q16.slt.part
@@ -52,8 +52,8 @@ limit 10;
logical_plan
Limit: skip=0, fetch=10
--Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST,
part.p_type ASC NULLS LAST, part.p_size ASC NULLS LAST, fetch=10
-----Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT
partsupp.ps_suppkey) AS supplier_cnt
-------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]],
aggr=[[COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)]]
+----Projection: part.p_brand, part.p_type, part.p_size, COUNT(alias1) AS
supplier_cnt
+------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]],
aggr=[[COUNT(alias1)]]
--------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size,
partsupp.ps_suppkey AS alias1]], aggr=[[]]
----------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
------------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type,
part.p_size
@@ -69,11 +69,11 @@ physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10
----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS
LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
-------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2
as p_size, COUNT(DISTINCT partsupp.ps_suppkey)@3 as supplier_cnt]
---------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT
partsupp.ps_suppkey)]
+------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2
as p_size, COUNT(alias1)@3 as supplier_cnt]
+--------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1,
p_size@2], 4), input_partitions=4
---------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1
as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)]
+--------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1
as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
----------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as
p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1,
p_size@2, alias1@3], 4), input_partitions=4