Stepan Stepanishchev created FLINK-39324:
--------------------------------------------
Summary: Allow MultiJoin to respect IS_NOT_DISTINCT_FROM when
extracting join keys
Key: FLINK-39324
URL: https://issues.apache.org/jira/browse/FLINK-39324
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Stepan Stepanishchev
h2. Example
Consider the following query:
{code:java}
INSERT INTO ResultSink
SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
FROM Users u
JOIN Orders o
ON u.user_id IS NOT DISTINCT FROM o.user_id
JOIN Payments p
ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
h2.
Before fix:
MultiJoin fails to extract a common join key, resulting in two MultiJoin nodes
and singleton distribution:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[single])
: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT
FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
: :- Exchange(distribution=[single])
: : +- Calc(select=[user_id, name])
: : +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: : +- TableSourceScan(table=[[default_catalog,
default_database, Users]], fields=[user_id, name, cash])
: +- Exchange(distribution=[single])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog,
default_database, Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[single])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
h2. After fix:
Joins are flattened into a single MultiJoin node:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id,
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
+- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS NOT
DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, user_id1)],
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[user_id, name])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Users]], fields=[user_id, name, cash])
:- Exchange(distribution=[hash[user_id]])
: +- Calc(select=[order_id, user_id])
: +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Orders]], fields=[order_id, user_id, product])
+- Exchange(distribution=[hash[user_id]])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments]], fields=[payment_id, price, user_id]){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)