[ 
https://issues.apache.org/jira/browse/FLINK-39285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39285:
-----------------------------------
    Labels: pull-request-available  (was: )

> Consider the result of filter pushdown rules in the MultiJoin rule
> ------------------------------------------------------------------
>
>                 Key: FLINK-39285
>                 URL: https://issues.apache.org/jira/browse/FLINK-39285
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Dmitriy Linevich
>            Priority: Major
>              Labels: pull-request-available
>
> Before the MultiJoin rules, there is a set of rules about "filter pushdown," 
> which causes Calc nodes to appear between Join nodes in the plan (they were 
> pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into 
> a single MultiJoin. 
> SQL:
> {code:java}
> CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
> ('connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 
> 'print'  );
> EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
> datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 
> 2  JOIN datagen3 {code}
> ACTUAL PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
> inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
>    :- Exchange(distribution=[hash[id0]])
>    :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
>    :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
> inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
> select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER 
> id0, INTEGER f2)])
>    :        :- Exchange(distribution=[hash[id]])
>    :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
>    :        :     +- TableSourceScan(table=[[default_catalog, 
> default_database, datagen1]], fields=[id, f1])
>    :        +- Exchange(distribution=[hash[id]])
>    :           +- Calc(select=[id, f2], where=[>(id, 3)])
>    :              +- TableSourceScan(table=[[default_catalog, 
> default_database, datagen2]], fields=[id, f2])
>    +- Exchange(distribution=[hash[id]])
>       +- Calc(select=[id, f3], where=[>(id, 3)])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen3]], fields=[id, f3]) {code}
> EXPECTED PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER, INNER], 
> inputUniqueKeys=[(id), (id), (id)], joinConditions=[=(id, id0), =(id0, id1)], 
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, f1], where=[>(id, 3)])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen1]], fields=[id, f1])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, f2], where=[>(id, 3)])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen2]], fields=[id, f2])
>    +- Exchange(distribution=[hash[id]])
>       +- Calc(select=[id, f3], where=[>(id, 3)])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> datagen3]], fields=[id, f3]) {code}
> I propose to modify the JoinToMultiJoinRule to take into account these Calc 
> nodes between the MultiJoin nodes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to