[
https://issues.apache.org/jira/browse/FLINK-39324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stepan Stepanishchev updated FLINK-39324:
-----------------------------------------
Description:
h2. Example
Consider the following query:
{code:java}
INSERT INTO ResultSink
SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
FROM Users u
JOIN Orders o
ON u.user_id IS NOT DISTINCT FROM o.user_id
JOIN Payments p
ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
h2. Before fix:
MultiJoin fails to extract a common join key, resulting in two MultiJoin nodes
and singleton distribution:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[single])
: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
: :- Exchange(distribution=[single])
: : +- Calc(select=[user_id, name])
: : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: : +- TableSourceScan(table=[[default_catalog,
default_database, Users]], fields=[user_id, name, cash])
: +- Exchange(distribution=[single])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog,
default_database, Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[single])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
h2. After fix:
Joins are flattened into a single MultiJoin node:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS NOT
DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[user_id, name])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Users]], fields=[user_id, name, cash])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[hash[user_id]])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
was:
h2. Example
Consider the following query:
{code:java}
INSERT INTO ResultSink
SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
FROM Users u
JOIN Orders o
ON u.user_id IS NOT DISTINCT FROM o.user_id
JOIN Payments p
ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
h2.
Before fix:
MultiJoin fails to extract a common join key, resulting in two MultiJoin nodes
and singleton distribution:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[single])
: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
: :- Exchange(distribution=[single])
: : +- Calc(select=[user_id, name])
: : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: : +- TableSourceScan(table=[[default_catalog,
default_database, Users]], fields=[user_id, name, cash])
: +- Exchange(distribution=[single])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog,
default_database, Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[single])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
h2. After fix:
Joins are flattened into a single MultiJoin node:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS NOT
DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[user_id, name])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Users]], fields=[user_id, name, cash])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[hash[user_id]])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
> Allow MultiJoin to respect IS_NOT_DISTINCT_FROM when extracting join keys
> -------------------------------------------------------------------------
>
> Key: FLINK-39324
> URL: https://issues.apache.org/jira/browse/FLINK-39324
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Stepan Stepanishchev
> Priority: Minor
>
> h2. Example
> Consider the following query:
> {code:java}
> INSERT INTO ResultSink
> SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
> FROM Users u
> JOIN Orders o
> ON u.user_id IS NOT DISTINCT FROM o.user_id
> JOIN Payments p
> ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
> h2. Before fix:
> MultiJoin fails to extract a common join key, resulting in two MultiJoin
> nodes and singleton distribution:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
> name, order_id, payment_id, price])
> +- Calc(select=[user_id, name, order_id, payment_id, price])
> +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
> inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
> FROM(user_id0, user_id1)],
> select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0,
> VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
> :- Exchange(distribution=[single])
> : +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
> inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
> FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0],
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
> : :- Exchange(distribution=[single])
> : : +- Calc(select=[user_id, name])
> : : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, Users]], fields=[user_id, name, cash])
> : +- Exchange(distribution=[single])
> : +- Calc(select=[order_id, user_id])
> : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, Orders]], fields=[order_id, user_id, product])
> +- Exchange(distribution=[single])
> +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> +- TableSourceScan(table=[[default_catalog, default_database,
> Payments]], fields=[payment_id, price, user_id]){code}
> h2. After fix:
> Joins are flattened into a single MultiJoin node:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
> name, order_id, payment_id, price])
> +- Calc(select=[user_id, name, order_id, payment_id, price])
> +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
> inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS
> NOT DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0,
> user_id1)],
> select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0,
> VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
> :- Exchange(distribution=[hash[user_id]])
> : +- Calc(select=[user_id, name])
> : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> Users]], fields=[user_id, name, cash])
> :- Exchange(distribution=[hash[user_id]])
> : +- Calc(select=[order_id, user_id])
> : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> Orders]], fields=[order_id, user_id, product])
> +- Exchange(distribution=[hash[user_id]])
> +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
> +- TableSourceScan(table=[[default_catalog, default_database,
> Payments]], fields=[payment_id, price, user_id]){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)