[ 
https://issues.apache.org/jira/browse/FLINK-39178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuyang Zhong updated FLINK-39178:
---------------------------------
    Description: 
Support LHS、Bushy、RHS cascaded delta join.

For example:
{code:java}
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1])
+- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
   +- DeltaJoin(joinType=[InnerJoin], ...)
      :- Exchange(distribution=[hash[a1, a2]])
      :  +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
      :     +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), 
<>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
      :        :- Exchange(distribution=[hash[a1, a2]])
      :        :  +- DropUpdateBefore
      :        :     +- TableSourceScan(table=[[default_catalog, 
default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
      :        +- Exchange(distribution=[hash[b1, b2]])
      :           +- DropUpdateBefore
      :              +- TableSourceScan(table=[[default_catalog, 
default_database, no_delete_src2]], fields=[b0, b2, b1])
      +- Exchange(distribution=[hash[c1, c2]])
         +- DropUpdateBefore
            +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
 {code}
 

  was:
Support LHS、Bushy、RHS cascaded delta join.

For example:
{code:java}
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1])
+- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
   +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, c1), =(a2, c2), <>(c1, 
CAST(a0 AS DOUBLE)))], leftToRight=[{lookupKeys=[c1=a1, c2=a2], remaining=[(c1 
<> CAST(a0 AS DOUBLE))]}], rightToLeft=[[{round=[1], 
sourceTables=[default_catalog.default_database.no_delete_src3], 
lookupTable=[default_catalog.default_database.no_delete_src1], 
lookupKeys=[a1=c1, a2=c2]}, {round=[2], 
sourceTables=[default_catalog.default_database.no_delete_src1], 
lookupTable=[default_catalog.default_database.no_delete_src2], 
lookupKeys=[b1=a1, b2=a2], remaining=[(a0 <> CAST(b1 AS INTEGER))]}]], 
select=[b0, b2, b1, a0, a1, a2, a3, c1, c2])
      :- Exchange(distribution=[hash[a1, a2]])
      :  +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
      :     +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), 
<>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
      :        :- Exchange(distribution=[hash[a1, a2]])
      :        :  +- DropUpdateBefore
      :        :     +- TableSourceScan(table=[[default_catalog, 
default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
      :        +- Exchange(distribution=[hash[b1, b2]])
      :           +- DropUpdateBefore
      :              +- TableSourceScan(table=[[default_catalog, 
default_database, no_delete_src2]], fields=[b0, b2, b1])
      +- Exchange(distribution=[hash[c1, c2]])
         +- DropUpdateBefore
            +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
 {code}
 


> Support cascaded delta join
> ---------------------------
>
>                 Key: FLINK-39178
>                 URL: https://issues.apache.org/jira/browse/FLINK-39178
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 2.3.0
>            Reporter: Xuyang Zhong
>            Assignee: Xuyang Zhong
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Support LHS、Bushy、RHS cascaded delta join.
> For example:
> {code:java}
> Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, 
> a1, a2, a3, b0, b2, b1])
> +- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
>    +- DeltaJoin(joinType=[InnerJoin], ...)
>       :- Exchange(distribution=[hash[a1, a2]])
>       :  +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
>       :     +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, 
> b2), <>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
>       :        :- Exchange(distribution=[hash[a1, a2]])
>       :        :  +- DropUpdateBefore
>       :        :     +- TableSourceScan(table=[[default_catalog, 
> default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
>       :        +- Exchange(distribution=[hash[b1, b2]])
>       :           +- DropUpdateBefore
>       :              +- TableSourceScan(table=[[default_catalog, 
> default_database, no_delete_src2]], fields=[b0, b2, b1])
>       +- Exchange(distribution=[hash[c1, c2]])
>          +- DropUpdateBefore
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to