Dmitriy Linevich created FLINK-39285:
----------------------------------------
Summary: Consider the result of filter pushdown rules in the
MultiJoin rule
Key: FLINK-39285
URL: https://issues.apache.org/jira/browse/FLINK-39285
Project: Flink
Issue Type: Improvement
Reporter: Dmitriy Linevich
Before the MultiJoin rules, there is a set of rules about "filter pushdown,"
which causes Calc nodes to appear between Join nodes in the plan (they were
pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into a
single MultiJoin.
SQL:
{code:java}
CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH
('connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'datagen', 'rows-per-second' = '1');
CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' = 'print'
);
EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM
datagen1 LEFT JOIN datagen2 on datagen1.id = datagen2.id and datagen2.id > 2
JOIN datagen3 {code}
PLAN:
{code:java}
Calc(select=[id, f1, f2, f3])
+- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)],
select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1,
INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
:- Exchange(distribution=[hash[id0]])
: +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
: +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER],
inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)],
select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER id0,
INTEGER f2)])
: :- Exchange(distribution=[hash[id]])
: : +- Calc(select=[id, f1], where=[>(id, 3)])
: : +- TableSourceScan(table=[[default_catalog, default_database,
datagen1]], fields=[id, f1])
: +- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, f2], where=[>(id, 3)])
: +- TableSourceScan(table=[[default_catalog, default_database,
datagen2]], fields=[id, f2])
+- Exchange(distribution=[hash[id]])
+- Calc(select=[id, f3], where=[>(id, 3)])
+- TableSourceScan(table=[[default_catalog, default_database,
datagen3]], fields=[id, f3]) {code}
I propose to modify the JoinToMultiJoinRule to take into account these Calc
nodes between the MultiJoin nodes.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)