[
https://issues.apache.org/jira/browse/FLINK-39178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xuyang Zhong updated FLINK-39178:
---------------------------------
Description:
Support LHS、Bushy、RHS cascaded delta join.
For example:
{code:java}
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1])
+- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
+- DeltaJoin(joinType=[InnerJoin], ...)
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
: +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2),
<>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
: :- Exchange(distribution=[hash[a1, a2]])
: : +- DropUpdateBefore
: : +- TableSourceScan(table=[[default_catalog,
default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
: +- Exchange(distribution=[hash[b1, b2]])
: +- DropUpdateBefore
: +- TableSourceScan(table=[[default_catalog,
default_database, no_delete_src2]], fields=[b0, b2, b1])
+- Exchange(distribution=[hash[c1, c2]])
+- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
{code}
was:
Support LHS、Bushy、RHS cascaded delta join.
For example:
{code:java}
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1])
+- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, c1), =(a2, c2), <>(c1,
CAST(a0 AS DOUBLE)))], leftToRight=[{lookupKeys=[c1=a1, c2=a2], remaining=[(c1
<> CAST(a0 AS DOUBLE))]}], rightToLeft=[[{round=[1],
sourceTables=[default_catalog.default_database.no_delete_src3],
lookupTable=[default_catalog.default_database.no_delete_src1],
lookupKeys=[a1=c1, a2=c2]}, {round=[2],
sourceTables=[default_catalog.default_database.no_delete_src1],
lookupTable=[default_catalog.default_database.no_delete_src2],
lookupKeys=[b1=a1, b2=a2], remaining=[(a0 <> CAST(b1 AS INTEGER))]}]],
select=[b0, b2, b1, a0, a1, a2, a3, c1, c2])
:- Exchange(distribution=[hash[a1, a2]])
: +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
: +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2),
<>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
: :- Exchange(distribution=[hash[a1, a2]])
: : +- DropUpdateBefore
: : +- TableSourceScan(table=[[default_catalog,
default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
: +- Exchange(distribution=[hash[b1, b2]])
: +- DropUpdateBefore
: +- TableSourceScan(table=[[default_catalog,
default_database, no_delete_src2]], fields=[b0, b2, b1])
+- Exchange(distribution=[hash[c1, c2]])
+- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
{code}
> Support cascaded delta join
> ---------------------------
>
> Key: FLINK-39178
> URL: https://issues.apache.org/jira/browse/FLINK-39178
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 2.3.0
> Reporter: Xuyang Zhong
> Assignee: Xuyang Zhong
> Priority: Major
> Fix For: 2.3.0
>
>
> Support LHS、Bushy、RHS cascaded delta join.
> For example:
> {code:java}
> Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0,
> a1, a2, a3, b0, b2, b1])
> +- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
> +- DeltaJoin(joinType=[InnerJoin], ...)
> :- Exchange(distribution=[hash[a1, a2]])
> : +- Calc(select=[b0, b2, b1, a0, a1, a2, a3])
> : +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2,
> b2), <>(a0, CAST(b1 AS INTEGER)))], select=[a0, a1, a2, a3, b0, b2, b1])
> : :- Exchange(distribution=[hash[a1, a2]])
> : : +- DropUpdateBefore
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, no_delete_src1]], fields=[a0, a1, a2, a3])
> : +- Exchange(distribution=[hash[b1, b2]])
> : +- DropUpdateBefore
> : +- TableSourceScan(table=[[default_catalog,
> default_database, no_delete_src2]], fields=[b0, b2, b1])
> +- Exchange(distribution=[hash[c1, c2]])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[default_catalog, default_database,
> no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)