[ 
https://issues.apache.org/jira/browse/FLINK-39285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-39285:
-------------------------------------
    Description: 
Before the MultiJoin rules, there is a set of rules about "filter pushdown," 
which causes Calc nodes to appear between Join nodes in the plan (they were 
pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into a 
single MultiJoin. 

SQL:
{code:java}
CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
('connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 'print' 
 );
EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 2  
JOIN datagen3 {code}
ACTUAL PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
   :- Exchange(distribution=[hash[id0]])
   :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
   :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER id0, 
INTEGER f2)])
   :        :- Exchange(distribution=[hash[id]])
   :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
   :        :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen1]], fields=[id, f1])
   :        +- Exchange(distribution=[hash[id]])
   :           +- Calc(select=[id, f2], where=[>(id, 3)])
   :              +- TableSourceScan(table=[[default_catalog, default_database, 
datagen2]], fields=[id, f2])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[id, f3], where=[>(id, 3)])
         +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3]) {code}
EXPECTED PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(id), (id), (id)], joinConditions=[=(id, id0), =(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, f1], where=[>(id, 3)])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen1]], fields=[id, f1])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, f2], where=[>(id, 3)])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen2]], fields=[id, f2])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[id, f3], where=[>(id, 3)])
         +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3]) {code}
I propose to modify the JoinToMultiJoinRule to take into account this Calc node 
between the MultiJoin nodes.

  was:
Before the MultiJoin rules, there is a set of rules about "filter pushdown," 
which causes Calc nodes to appear between Join nodes in the plan (they were 
pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into a 
single MultiJoin. 

SQL:
{code:java}
CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
('connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 'print' 
 );
EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 2  
JOIN datagen3 {code}
ACTUAL PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
   :- Exchange(distribution=[hash[id0]])
   :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
   :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER id0, 
INTEGER f2)])
   :        :- Exchange(distribution=[hash[id]])
   :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
   :        :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen1]], fields=[id, f1])
   :        +- Exchange(distribution=[hash[id]])
   :           +- Calc(select=[id, f2], where=[>(id, 3)])
   :              +- TableSourceScan(table=[[default_catalog, default_database, 
datagen2]], fields=[id, f2])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[id, f3], where=[>(id, 3)])
         +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3]) {code}
EXPECTED PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(id), (id), (id)], joinConditions=[=(id, id0), =(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, f1], where=[>(id, 3)])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen1]], fields=[id, f1])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, f2], where=[>(id, 3)])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen2]], fields=[id, f2])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[id, f3], where=[>(id, 3)])
         +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3]) {code}
I propose to modify the JoinToMultiJoinRule to take into account these Calc 
nodes between the MultiJoin nodes.


> Consider the result of filter pushdown rules in the MultiJoin rule
> ------------------------------------------------------------------
>
>                 Key: FLINK-39285
>                 URL: https://issues.apache.org/jira/browse/FLINK-39285
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Dmitriy Linevich
>            Priority: Major
>              Labels: pull-request-available
>
> Before the MultiJoin rules, there is a set of rules about "filter pushdown," 
> which causes Calc nodes to appear between Join nodes in the plan (they were 
> pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into 
> a single MultiJoin. 
> SQL:
> {code:java}
> CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
> ('connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 
> 'print'  );
> EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
> datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 
> 2  JOIN datagen3 {code}
> ACTUAL PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
> inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
>    :- Exchange(distribution=[hash[id0]])
>    :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
>    :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
> inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
> select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER 
> id0, INTEGER f2)])
>    :        :- Exchange(distribution=[hash[id]])
>    :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
>    :        :     +- TableSourceScan(table=[[default_catalog, 
> default_database, datagen1]], fields=[id, f1])
>    :        +- Exchange(distribution=[hash[id]])
>    :           +- Calc(select=[id, f2], where=[>(id, 3)])
>    :              +- TableSourceScan(table=[[default_catalog, 
> default_database, datagen2]], fields=[id, f2])
>    +- Exchange(distribution=[hash[id]])
>       +- Calc(select=[id, f3], where=[>(id, 3)])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen3]], fields=[id, f3]) {code}
> EXPECTED PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER, INNER], 
> inputUniqueKeys=[(id), (id), (id)], joinConditions=[=(id, id0), =(id0, id1)], 
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, f1], where=[>(id, 3)])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen1]], fields=[id, f1])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, f2], where=[>(id, 3)])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen2]], fields=[id, f2])
>    +- Exchange(distribution=[hash[id]])
>       +- Calc(select=[id, f3], where=[>(id, 3)])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen3]], fields=[id, f3]) {code}
> I propose to modify the JoinToMultiJoinRule to take into account this Calc 
> node between the MultiJoin nodes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to