Dmitriy Linevich created FLINK-39285:
----------------------------------------

             Summary: Consider the result of filter pushdown rules in the 
MultiJoin rule
                 Key: FLINK-39285
                 URL: https://issues.apache.org/jira/browse/FLINK-39285
             Project: Flink
          Issue Type: Improvement
            Reporter: Dmitriy Linevich


Before the MultiJoin rules, there is a set of rules about "filter pushdown," 
which causes Calc nodes to appear between Join nodes in the plan (they were 
pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into a 
single MultiJoin. 

SQL:
{code:java}
CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
('connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  ( 
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 'print' 
 );
EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 2  
JOIN datagen3 {code}
PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
   :- Exchange(distribution=[hash[id0]])
   :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
   :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER id0, 
INTEGER f2)])
   :        :- Exchange(distribution=[hash[id]])
   :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
   :        :     +- TableSourceScan(table=[[default_catalog, default_database, 
datagen1]], fields=[id, f1])
   :        +- Exchange(distribution=[hash[id]])
   :           +- Calc(select=[id, f2], where=[>(id, 3)])
   :              +- TableSourceScan(table=[[default_catalog, default_database, 
datagen2]], fields=[id, f2])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[id, f3], where=[>(id, 3)])
         +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3]) {code}
I propose to modify the JoinToMultiJoinRule to take into account these Calc 
nodes between the MultiJoin nodes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to