[ 
https://issues.apache.org/jira/browse/CALCITE-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936081#comment-17936081
 ] 

Zhen Chen commented on CALCITE-6893:
------------------------------------

[~julianhyde] Sorry, maybe I didn't describe it clearly. First of all, there 
are two rules involved here, which are INTERSECT_TO_DISTINCT  that I modified 
and AGGREGATE_UNION_TRANSPOSE, and both are executed in volcano planner.

For test is
{code:java}
@Test void testIntersectToDistinctVolcanol() {
  final String sql = "select ename from emp where deptno = 10\n"
          + "intersect\n"
          + "select ename from emp where deptno = 20\n";
  sql(sql).withVolcanoPlanner(true, p -> {
    p.addRule(CoreRules.INTERSECT_TO_DISTINCT); // this is my modified,union 
children not contain agg
    p.addRule(CoreRules.AGGREGATE_UNION_TRANSPOSE);
    p.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
    p.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
    p.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
    p.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
    p.addRule(EnumerableRules.ENUMERABLE_UNION_RULE);
  }).check();
} {code}
If displayed using SQL

First is this sql that called SQL0
{code:java}
select ename from emp where deptno = 10
intersect
select ename from emp where deptno = 20 {code}
use INTERSECT_TO_DISTINCT  that I modified can get this sql that called SQL1
{code:java}
select ename
from (
  select ename, count(*) as c
  from (
    select ename from emp
    where deptno = 10
    union all
    select ename from emp
    where deptno = 20)
  group by ename)
where c = 2 {code}
and use AGGREGATE_UNION_TRANSPOSE can get this sql that called SQL2
{code:java}
select ename
from (
  select ename, sum(c)
  from (
    select ename, count(*) as c from emp
    where deptno = 10 group by ename
    union all
    select ename, count(*) as c from emp
    where deptno = 20 group by ename)
  group by ename)
where c = 2 {code}
then both of SQL1 and SQL2 will be send to volcano, volcano can find a best 
plan. I think union children not contain agg is better(SQL1 is better than 
SQL2). The results given by volcano are consistent. Adding an agg will increase 
the cost. Because the logic of the stand-alone plan is reasonable. I mentioned 
the distributed plan because the distributed plan will have exchange, and 
exchange will have additional costs, so it is very meaningful to reduce the 
number of intermediate results after agg is pushed down.

> Remove agg from Union children in IntersectToDistinctRule
> ---------------------------------------------------------
>
>                 Key: CALCITE-6893
>                 URL: https://issues.apache.org/jira/browse/CALCITE-6893
>             Project: Calcite
>          Issue Type: Improvement
>            Reporter: Zhen Chen
>            Assignee: Zhen Chen
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2025-03-16-11-02-04-725.png
>
>
> Whether agg should be pushed down should be determined by the cost.
> SQL:
>  
> {code:java}
> select ename from emp where deptno = 10
> intersect
> select ename from emp where deptno = 20{code}
>  
> Then used rule INTERSECT_TO_DISTINCT(updated version)  and 
> AGGREGATE_UNION_TRANSPOSE in hep planner.
> We can get logical plan:
>  
> {code:java}
> LogicalProject(ENAME=[$0])
>   LogicalFilter(condition=[=($1, 2)])
>     LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])
>       LogicalUnion(all=[true])
>         LogicalAggregate(group=[{0}], agg#0=[COUNT()])
>           LogicalProject(ENAME=[$1])
>             LogicalFilter(condition=[=($7, 10)])
>               LogicalTableScan(table=[[CATALOG, SALES, EMP]])
>         LogicalAggregate(group=[{0}], agg#0=[COUNT()])
>           LogicalProject(ENAME=[$1])
>             LogicalFilter(condition=[=($7, 20)])
>               LogicalTableScan(table=[[CATALOG, SALES, EMP]]) {code}
> Then we also use the two same rules in volcanol planner.
>  
> Final Phy Plan:
>  
> {code:java}
> EnumerableProject(ENAME=[$0]): rowcount = 1.0, cumulative cost = 
> {43.72500000000001 rows, 68.4 cpu, 0.0 io}, id = 85
>   EnumerableFilter(condition=[=($1, 2)]): rowcount = 1.0, cumulative cost = 
> {42.72500000000001 rows, 67.4 cpu, 0.0 io}, id = 84
>     EnumerableAggregate(group=[{0}], agg#0=[COUNT()]): rowcount = 1.0, 
> cumulative cost = {41.72500000000001 rows, 66.4 cpu, 0.0 io}, id = 83
>       EnumerableUnion(all=[true]): rowcount = 4.2, cumulative cost = 
> {40.60000000000001 rows, 66.4 cpu, 0.0 io}, id = 82
>         EnumerableProject(ENAME=[$1]): rowcount = 2.1, cumulative cost = 
> {18.200000000000003 rows, 31.1 cpu, 0.0 io}, id = 79
>           EnumerableFilter(condition=[=($7, 10)]): rowcount = 2.1, cumulative 
> cost = {16.1 rows, 29.0 cpu, 0.0 io}, id = 78
>             EnumerableTableScan(table=[[CATALOG, SALES, EMP]]): rowcount = 
> 14.0, cumulative cost = {14.0 rows, 15.0 cpu, 0.0 io}, id = 69
>         EnumerableProject(ENAME=[$1]): rowcount = 2.1, cumulative cost = 
> {18.200000000000003 rows, 31.1 cpu, 0.0 io}, id = 81
>           EnumerableFilter(condition=[=($7, 20)]): rowcount = 2.1, cumulative 
> cost = {16.1 rows, 29.0 cpu, 0.0 io}, id = 80
>             EnumerableTableScan(table=[[CATALOG, SALES, EMP]]): rowcount = 
> 14.0, cumulative cost = {14.0 rows, 15.0 cpu, 0.0 io}, id = 69
>  {code}
>  
> We can see the best plan, the children of union do not have agg. 
> DAG:
> !image-2025-03-16-11-02-04-725.png|width=545,height=348!
> Currently, Calcite does not support distributed planning. If in a distributed 
> planning, agg will be divided into two stages. If the filtering effect in the 
> first stage is very good, the downward push of agg will be meaningful and 
> reduce the network transmission of shuffle. However, optimizing the current 
> rule is also meaningful. Calcite now also has rules that can do the downward 
> push of agg. We can give the choice to the volcano.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to