ldadima commented on PR #27508:
URL: https://github.com/apache/flink/pull/27508#issuecomment-4009426166

   Hi, @gustavodemorais 
   I've changed code according your  suggestions
   
   I noticed some problems and also want to make some suggestions:
   
   1. There are cases when MultiJoin does not merge into one MultiJoin and 
several MultiJoin nodes remain, each having 2 inputs. MultiJoin operators with 
2 inputs may perform less efficiently than regular Join. Ideally, this 
assumption about performance issues should be verified. If there is a problem, 
I suggest adding a rule that will convert such MultiJoin back to regular Join 
(the rule can also be enabled via an option).
   2. Before the MultiJoin rules, there is a set of rules about "predicate 
pushdown," which causes Calc nodes to appear between Join nodes in the plan 
(they were pushed down). These "Calc" nodes prevent the merging of MultiJoin 
nodes into a single MultiJoin. Therefore, I propose to raise the rule related 
to MultiJoin above the "predicate pushdown" rule set and also add a similar 
rule set aimed at pushing down filters from MultiJoin (not just regular Join).
   
   SQL EXAMPLE
   ```
   CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
( 'connector' = 'datagen', 'rows-per-second' = '1');
   CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
('connector' = 'datagen', 'rows-per-second' = '1');
   CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH  
( 'connector' = 'datagen', 'rows-per-second' = '1');
   CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 
'print'  );
   EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM  
datagen1  LEFT JOIN datagen2 on datagen1.id = datagen2.id  and datagen2.id > 2  
JOIN datagen3
   ```
   
   PLAN EXAMPLE 
   ```== Optimized Physical Plan ==
   Calc(select=[id, f1, f2, f3])
   +- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)], 
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1, 
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
      :- Exchange(distribution=[hash[id0]])
      :  +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
      :     +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], 
inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)], 
select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER id0, 
INTEGER f2)])
      :        :- Exchange(distribution=[hash[id]])
      :        :  +- Calc(select=[id, f1], where=[>(id, 3)])
      :        :     +- TableSourceScan(table=[[default_catalog, 
default_database, datagen1]], fields=[id, f1])
      :        +- Exchange(distribution=[hash[id]])
      :           +- Calc(select=[id, f2], where=[>(id, 3)])
      :              +- TableSourceScan(table=[[default_catalog, 
default_database, datagen2]], fields=[id, f2])
      +- Exchange(distribution=[hash[id]])
         +- Calc(select=[id, f3], where=[>(id, 3)])
            +- TableSourceScan(table=[[default_catalog, default_database, 
datagen3]], fields=[id, f3])
   ```
   
   As a result,  may to create 3 Jira tickets :
   
   1. Generic/Binary row data problem in MultiJoinStateViews
   2. Transform binary MultiJoin (with 2 inputs) bact to regular Join
   4. Add predicate pushdown rules for MultiJoin case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to