Stepan Stepanishchev created FLINK-39324:
--------------------------------------------

             Summary: Allow MultiJoin to respect IS_NOT_DISTINCT_FROM when 
extracting join keys
                 Key: FLINK-39324
                 URL: https://issues.apache.org/jira/browse/FLINK-39324
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Stepan Stepanishchev


h2. Example

Consider the following query:
{code:java}
INSERT INTO ResultSink
SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
FROM Users u
  JOIN Orders o 
    ON u.user_id IS NOT DISTINCT FROM o.user_id
  JOIN Payments p 
    ON o.user_id IS NOT DISTINCT FROM p.user_id{code}
h2. 
Before fix:

 

MultiJoin fails to extract a common join key, resulting in two MultiJoin nodes 
and singleton distribution:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, 
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
   +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT 
FROM(user_id0, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
      :- Exchange(distribution=[single])
      :  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT 
FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
      :     :- Exchange(distribution=[single])
      :     :  +- Calc(select=[user_id, name])
      :     :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :     :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users]], fields=[user_id, name, cash])
      :     +- Exchange(distribution=[single])
      :        +- Calc(select=[order_id, user_id])
      :           +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :              +- TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[order_id, user_id, product])
      +- Exchange(distribution=[single])
         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, 
Payments]], fields=[payment_id, price, user_id]){code}
h2. After fix:

Joins are flattened into a single MultiJoin node:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, 
name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS NOT 
DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, user_id1)], 
select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
      :- Exchange(distribution=[hash[user_id]])
      :  +- Calc(select=[user_id, name])
      :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :        +- TableSourceScan(table=[[default_catalog, default_database, 
Users]], fields=[user_id, name, cash])
      :- Exchange(distribution=[hash[user_id]])
      :  +- Calc(select=[order_id, user_id])
      :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :        +- TableSourceScan(table=[[default_catalog, default_database, 
Orders]], fields=[order_id, user_id, product])
      +- Exchange(distribution=[hash[user_id]])
         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, 
Payments]], fields=[payment_id, price, user_id]){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to