[ 
https://issues.apache.org/jira/browse/FLINK-39174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuyang Zhong updated FLINK-39174:
---------------------------------
    Description: 
Makes the join below optimizable into a delta join.
{code:java}
Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1], conflictStrategy=[DEDUPLICATE])
+- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
   +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, b0, b2, b1, f0], 
async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
      +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1])
         :- Exchange(distribution=[hash[a1, a2]])
         :  +- TableSourceScan(table=[[default_catalog, default_database, 
src1]], fields=[a0, a1, a2, a3])
         +- Exchange(distribution=[hash[b1, b2]])
            +- TableSourceScan(table=[[default_catalog, default_database, 
src2]], fields=[b0, b2, b1]) {code}

> Support lookup join after delta join
> ------------------------------------
>
>                 Key: FLINK-39174
>                 URL: https://issues.apache.org/jira/browse/FLINK-39174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 2.3.0
>            Reporter: Xuyang Zhong
>            Assignee: Xuyang Zhong
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> Makes the join below optimizable into a delta join.
> {code:java}
> Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, 
> b0, b2, b1], conflictStrategy=[DEDUPLICATE])
> +- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
>    +- LookupJoin(table=[default_catalog.default_database.dim], 
> joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, b0, b2, b1, 
> f0], async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
>       +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
> select=[a0, a1, a2, a3, b0, b2, b1])
>          :- Exchange(distribution=[hash[a1, a2]])
>          :  +- TableSourceScan(table=[[default_catalog, default_database, 
> src1]], fields=[a0, a1, a2, a3])
>          +- Exchange(distribution=[hash[b1, b2]])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> src2]], fields=[b0, b2, b1]) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to