[ 
https://issues.apache.org/jira/browse/FLINK-38753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38753:
-----------------------------------
    Labels: pull-request-available  (was: )

> Enrich more upsert keys by equiv expressions
> --------------------------------------------
>
>                 Key: FLINK-38753
>                 URL: https://issues.apache.org/jira/browse/FLINK-38753
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.2.0
>            Reporter: Xuyang Zhong
>            Priority: Major
>              Labels: pull-request-available
>
> Take the following sql as example
> {code:java}
> @Test
> def test3(): Unit = {
>   tEnv.executeSql(s"""
>                      |create temporary table src1 (
>                      |  a1 int primary key not enforced,
>                      |  b1 int
>                      |) with (
>                      |  'connector' = 'values',
>                      |  'changelog-mode' = 'I,UA,UB,D'
>                      |)
>                      |""".stripMargin)
>   tEnv.executeSql(s"""
>                      |create temporary table src2 (
>                      |  a2 int,
>                      |  b2 int,
>                      |  c2 int,
>                      |  primary key(a2, b2) not enforced
>                      |) with (
>                      |  'connector' = 'values',
>                      |  'changelog-mode' = 'I,UA,UB,D'
>                      |)
>                      |""".stripMargin)
>   tEnv.executeSql(s"""
>                      |create temporary table snk (
>                      |  a1 int,
>                      |  b1 int,
>                      |  a2 int,
>                      |  b2 int,
>                      |  c2 int,
>                      |  primary key(a1, b2) not enforced
>                      |) with (
>                      |  'connector' = 'values',
>                      |  'sink-insert-only' = 'false',
>                      |  'sink-changelog-mode-enforced' = 'I,UA,D'
>                      |)
>                      |""".stripMargin)
>   util.verifyExplainInsert("""
>                              |insert into snk
>                              | select * from src1 join src2 on a1 = a2
>                              |""".stripMargin)
> } {code}
> The plan is:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, 
> b2, c2])
> +- LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], c2=[$4])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
>       :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, 
> c2], upsertMaterialize=[true])
> +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2, c2], 
> leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
>    :- Exchange(distribution=[hash[a1]])
>    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
> fields=[a1, b1])
>    +- Exchange(distribution=[hash[a2]])
>       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
> fields=[a2, b2, c2])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, 
> c2], upsertMaterialize=[true])
> +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2, b2, c2], 
> leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
>    :- Exchange(distribution=[hash[a1]])
>    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
> fields=[a1, b1])
>    +- Exchange(distribution=[hash[a2]])
>       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
> fields=[a2, b2, c2])
>  {code}
> There is a `upsertMaterialize` in sink because the sink pk is `a1, b2` (`\{0, 
> 3}`) and upsert key upstream `[\{2, 3}, \{0, 2, 3}]` doesn't contains it.
> However, this is an inner join and there is a equiv cond `a1 = a2`, and we 
> can use this equivalence relation to enrich more upsert keys, to resolve the 
> `upsertMaterialize`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to