[
https://issues.apache.org/jira/browse/CALCITE-6893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936081#comment-17936081
]
Zhen Chen commented on CALCITE-6893:
------------------------------------
[~julianhyde] Sorry, maybe I didn't describe it clearly. First of all, there
are two rules involved here, which are INTERSECT_TO_DISTINCT that I modified
and AGGREGATE_UNION_TRANSPOSE, and both are executed in volcano planner.
For test is
{code:java}
@Test void testIntersectToDistinctVolcanol() {
final String sql = "select ename from emp where deptno = 10\n"
+ "intersect\n"
+ "select ename from emp where deptno = 20\n";
sql(sql).withVolcanoPlanner(true, p -> {
p.addRule(CoreRules.INTERSECT_TO_DISTINCT); // this is my modified,union
children not contain agg
p.addRule(CoreRules.AGGREGATE_UNION_TRANSPOSE);
p.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
p.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
p.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
p.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
p.addRule(EnumerableRules.ENUMERABLE_UNION_RULE);
}).check();
} {code}
If displayed using SQL
First is this sql that called SQL0
{code:java}
select ename from emp where deptno = 10
intersect
select ename from emp where deptno = 20 {code}
use INTERSECT_TO_DISTINCT that I modified can get this sql that called SQL1
{code:java}
select ename
from (
select ename, count(*) as c
from (
select ename from emp
where deptno = 10
union all
select ename from emp
where deptno = 20)
group by ename)
where c = 2 {code}
and use AGGREGATE_UNION_TRANSPOSE can get this sql that called SQL2
{code:java}
select ename
from (
select ename, sum(c)
from (
select ename, count(*) as c from emp
where deptno = 10 group by ename
union all
select ename, count(*) as c from emp
where deptno = 20 group by ename)
group by ename)
where c = 2 {code}
then both of SQL1 and SQL2 will be send to volcano, volcano can find a best
plan. I think union children not contain agg is better(SQL1 is better than
SQL2). The results given by volcano are consistent. Adding an agg will increase
the cost. Because the logic of the stand-alone plan is reasonable. I mentioned
the distributed plan because the distributed plan will have exchange, and
exchange will have additional costs, so it is very meaningful to reduce the
number of intermediate results after agg is pushed down.
> Remove agg from Union children in IntersectToDistinctRule
> ---------------------------------------------------------
>
> Key: CALCITE-6893
> URL: https://issues.apache.org/jira/browse/CALCITE-6893
> Project: Calcite
> Issue Type: Improvement
> Reporter: Zhen Chen
> Assignee: Zhen Chen
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2025-03-16-11-02-04-725.png
>
>
> Whether agg should be pushed down should be determined by the cost.
> SQL:
>
> {code:java}
> select ename from emp where deptno = 10
> intersect
> select ename from emp where deptno = 20{code}
>
> Then used rule INTERSECT_TO_DISTINCT(updated version) and
> AGGREGATE_UNION_TRANSPOSE in hep planner.
> We can get logical plan:
>
> {code:java}
> LogicalProject(ENAME=[$0])
> LogicalFilter(condition=[=($1, 2)])
> LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])
> LogicalUnion(all=[true])
> LogicalAggregate(group=[{0}], agg#0=[COUNT()])
> LogicalProject(ENAME=[$1])
> LogicalFilter(condition=[=($7, 10)])
> LogicalTableScan(table=[[CATALOG, SALES, EMP]])
> LogicalAggregate(group=[{0}], agg#0=[COUNT()])
> LogicalProject(ENAME=[$1])
> LogicalFilter(condition=[=($7, 20)])
> LogicalTableScan(table=[[CATALOG, SALES, EMP]]) {code}
> Then we also use the two same rules in volcanol planner.
>
> Final Phy Plan:
>
> {code:java}
> EnumerableProject(ENAME=[$0]): rowcount = 1.0, cumulative cost =
> {43.72500000000001 rows, 68.4 cpu, 0.0 io}, id = 85
> EnumerableFilter(condition=[=($1, 2)]): rowcount = 1.0, cumulative cost =
> {42.72500000000001 rows, 67.4 cpu, 0.0 io}, id = 84
> EnumerableAggregate(group=[{0}], agg#0=[COUNT()]): rowcount = 1.0,
> cumulative cost = {41.72500000000001 rows, 66.4 cpu, 0.0 io}, id = 83
> EnumerableUnion(all=[true]): rowcount = 4.2, cumulative cost =
> {40.60000000000001 rows, 66.4 cpu, 0.0 io}, id = 82
> EnumerableProject(ENAME=[$1]): rowcount = 2.1, cumulative cost =
> {18.200000000000003 rows, 31.1 cpu, 0.0 io}, id = 79
> EnumerableFilter(condition=[=($7, 10)]): rowcount = 2.1, cumulative
> cost = {16.1 rows, 29.0 cpu, 0.0 io}, id = 78
> EnumerableTableScan(table=[[CATALOG, SALES, EMP]]): rowcount =
> 14.0, cumulative cost = {14.0 rows, 15.0 cpu, 0.0 io}, id = 69
> EnumerableProject(ENAME=[$1]): rowcount = 2.1, cumulative cost =
> {18.200000000000003 rows, 31.1 cpu, 0.0 io}, id = 81
> EnumerableFilter(condition=[=($7, 20)]): rowcount = 2.1, cumulative
> cost = {16.1 rows, 29.0 cpu, 0.0 io}, id = 80
> EnumerableTableScan(table=[[CATALOG, SALES, EMP]]): rowcount =
> 14.0, cumulative cost = {14.0 rows, 15.0 cpu, 0.0 io}, id = 69
> {code}
>
> We can see the best plan, the children of union do not have agg.
> DAG:
> !image-2025-03-16-11-02-04-725.png|width=545,height=348!
> Currently, Calcite does not support distributed planning. If in a distributed
> planning, agg will be divided into two stages. If the filtering effect in the
> first stage is very good, the downward push of agg will be meaningful and
> reduce the network transmission of shuffle. However, optimizing the current
> rule is also meaningful. Calcite now also has rules that can do the downward
> push of agg. We can give the choice to the volcano.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)