[
https://issues.apache.org/jira/browse/FLINK-39178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xuyang Zhong updated FLINK-39178:
---------------------------------
Summary: Support cascaded delta join in planner (was: Support cascaded
delta join)
> Support cascaded delta join in planner
> --------------------------------------
>
> Key: FLINK-39178
> URL: https://issues.apache.org/jira/browse/FLINK-39178
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 2.3.0
> Reporter: Xuyang Zhong
> Assignee: Xuyang Zhong
> Priority: Major
> Fix For: 2.3.0
>
>
> Support LHS、Bushy、RHS cascaded delta join.
> For example:
> {code:java}
> Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0,
> a1, a2, a3, b0, b2, b1])
> +- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
> +- DeltaJoin(joinType=[InnerJoin], ...)
> :- Exchange(distribution=[hash[a1, a2]])
> : +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
> : +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2,
> b2), <>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
> : :- Exchange(distribution=[hash[a1, a2]])
> : : +- DropUpdateBefore
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
> : +- Exchange(distribution=[hash[b1, b2]])
> : +- DropUpdateBefore
> : +- TableSourceScan(table=[[default_catalog,
> default_database, no_delete_src2]], fields=[b0, b2, b1])
> +- Exchange(distribution=[hash[c1, c2]])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[default_catalog, default_database,
> no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)