[ 
https://issues.apache.org/jira/browse/FLINK-39324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39324:
-----------------------------------
    Labels: pull-request-available  (was: )

> Allow MultiJoin to respect IS_NOT_DISTINCT_FROM when extracting join keys
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39324
>                 URL: https://issues.apache.org/jira/browse/FLINK-39324
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Stepan Stepanishchev
>            Priority: Minor
>              Labels: pull-request-available
>
> h2. Example
> Consider the following query:
> {code:java}
> INSERT INTO ResultSink
> SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
> FROM Users u
>   JOIN Orders o 
>     ON u.user_id IS NOT DISTINCT FROM o.user_id
>   JOIN Payments p 
>     ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
> h2. Before fix:
> MultiJoin fails to extract a common join key, resulting in two MultiJoin 
> nodes and singleton distribution:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, 
> name, order_id, payment_id, price])
> +- Calc(select=[user_id, name, order_id, payment_id, price])
>    +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], 
> inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT 
> FROM(user_id0, user_id1)], 
> select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], 
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, 
> VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
>       :- Exchange(distribution=[single])
>       :  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], 
> inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT 
> FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0], 
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
>       :     :- Exchange(distribution=[single])
>       :     :  +- Calc(select=[user_id, name])
>       :     :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>       :     :        +- TableSourceScan(table=[[default_catalog, 
> default_database, Users]], fields=[user_id, name, cash])
>       :     +- Exchange(distribution=[single])
>       :        +- Calc(select=[order_id, user_id])
>       :           +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>       :              +- TableSourceScan(table=[[default_catalog, 
> default_database, Orders]], fields=[order_id, user_id, product])
>       +- Exchange(distribution=[single])
>          +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> Payments]], fields=[payment_id, price, user_id]){code}
> h2. After fix:
> Joins are flattened into a single MultiJoin node:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, 
> name, order_id, payment_id, price])
> +- Calc(select=[user_id, name, order_id, payment_id, price])
>    +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
> inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS 
> NOT DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, 
> user_id1)], 
> select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], 
> rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
> VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, 
> VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
>       :- Exchange(distribution=[hash[user_id]])
>       :  +- Calc(select=[user_id, name])
>       :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>       :        +- TableSourceScan(table=[[default_catalog, default_database, 
> Users]], fields=[user_id, name, cash])
>       :- Exchange(distribution=[hash[user_id]])
>       :  +- Calc(select=[order_id, user_id])
>       :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>       :        +- TableSourceScan(table=[[default_catalog, default_database, 
> Orders]], fields=[order_id, user_id, product])
>       +- Exchange(distribution=[hash[user_id]])
>          +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> Payments]], fields=[payment_id, price, user_id]){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to