[
https://issues.apache.org/jira/browse/FLINK-39285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39285:
-----------------------------------
Labels: pull-request-available (was: )
> Consider the result of filter pushdown rules in the MultiJoin rule
> ------------------------------------------------------------------
>
> Key: FLINK-39285
> URL: https://issues.apache.org/jira/browse/FLINK-39285
> Project: Flink
> Issue Type: Improvement
> Reporter: Dmitriy Linevich
> Priority: Major
> Labels: pull-request-available
>
> Before the MultiJoin rules, there is a set of rules about "filter pushdown,"
> which causes Calc nodes to appear between Join nodes in the plan (they were
> pushed down). These "Calc" nodes prevent the merging of MultiJoin nodes into
> a single MultiJoin.
> SQL:
> {code:java}
> CREATE TABLE datagen1 (id int, f1 int, PRIMARY KEY (id) NOT ENFORCED) WITH (
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen2 (id int, f2 int, PRIMARY KEY (id) NOT ENFORCED) WITH
> ('connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE datagen3 (id int, f3 int, PRIMARY KEY (id) NOT ENFORCED) WITH (
> 'connector' = 'datagen', 'rows-per-second' = '1');
> CREATE TABLE print (id int, f1 int, f2 int, f3 int) WITH ('connector' =
> 'print' );
> EXPLAIN SELECT datagen1.id, datagen1.f1, datagen2.f2, datagen3.f3 FROM
> datagen1 LEFT JOIN datagen2 on datagen1.id = datagen2.id and datagen2.id >
> 2 JOIN datagen3 {code}
> ACTUAL PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id0], joinTypes=[INNER],
> inputUniqueKeys=[noUniqueKey, (id)], joinConditions=[=(id0, id1)],
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1,
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
> :- Exchange(distribution=[hash[id0]])
> : +- Calc(select=[id, f1, CAST(id0 AS INTEGER) AS id0, f2])
> : +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER],
> inputUniqueKeys=[(id), (id)], joinConditions=[=(id, id0)],
> select=[id,f1,id0,f2], rowType=[RecordType(INTEGER id, INTEGER f1, INTEGER
> id0, INTEGER f2)])
> : :- Exchange(distribution=[hash[id]])
> : : +- Calc(select=[id, f1], where=[>(id, 3)])
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, datagen1]], fields=[id, f1])
> : +- Exchange(distribution=[hash[id]])
> : +- Calc(select=[id, f2], where=[>(id, 3)])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, datagen2]], fields=[id, f2])
> +- Exchange(distribution=[hash[id]])
> +- Calc(select=[id, f3], where=[>(id, 3)])
> +- TableSourceScan(table=[[default_catalog, default_database,
> datagen3]], fields=[id, f3]) {code}
> EXPECTED PLAN:
> {code:java}
> Calc(select=[id, f1, f2, f3])
> +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER, INNER],
> inputUniqueKeys=[(id), (id), (id)], joinConditions=[=(id, id0), =(id0, id1)],
> select=[id,f1,id0,f2,id1,f3], rowType=[RecordType(INTEGER id, INTEGER f1,
> INTEGER id0, INTEGER f2, INTEGER id1, INTEGER f3)])
> :- Exchange(distribution=[hash[id]])
> : +- Calc(select=[id, f1], where=[>(id, 3)])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> datagen1]], fields=[id, f1])
> :- Exchange(distribution=[hash[id]])
> : +- Calc(select=[id, f2], where=[>(id, 3)])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> datagen2]], fields=[id, f2])
> +- Exchange(distribution=[hash[id]])
> +- Calc(select=[id, f3], where=[>(id, 3)])
> +- TableSourceScan(table=[[default_catalog, default_database,
> datagen3]], fields=[id, f3]) {code}
> I propose to modify the JoinToMultiJoinRule to take into account these Calc
> nodes between the MultiJoin nodes.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)