[
https://issues.apache.org/jira/browse/FLINK-38753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18048584#comment-18048584
]
Anton Borisov commented on FLINK-38753:
---------------------------------------
I prepared a PR: https://github.com/apache/flink/pull/27372
Scope: binary joins only.
This PR enriches inferred upsert keys by propagating them across equi-join key
equivalences (as extracted from JoinInfo), with proper guarding for outer-join
null generation.
The implementation uses a bounded fixpoint/worklist algorithm to compute the
closure of upsert-key substitutions when multiple equi-join predicates are
present. This is essential for multi-join scenarios, but also applies to binary
joins with multiple equi conditions.
> Enrich more upsert keys by equiv expressions
> --------------------------------------------
>
> Key: FLINK-38753
> URL: https://issues.apache.org/jira/browse/FLINK-38753
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 2.2.0
> Reporter: Xuyang Zhong
> Priority: Major
> Labels: pull-request-available
>
> Take the following sql as example
> {code:java}
> @Test
> def test3(): Unit = {
> tEnv.executeSql(s"""
> |create temporary table src1 (
> | a1 int primary key not enforced,
> | b1 int
> |) with (
> | 'connector' = 'values',
> | 'changelog-mode' = 'I,UA,UB,D'
> |)
> |""".stripMargin)
> tEnv.executeSql(s"""
> |create temporary table src2 (
> | a2 int,
> | b2 int,
> | c2 int,
> | primary key(a2, b2) not enforced
> |) with (
> | 'connector' = 'values',
> | 'changelog-mode' = 'I,UA,UB,D'
> |)
> |""".stripMargin)
> tEnv.executeSql(s"""
> |create temporary table snk (
> | a1 int,
> | b1 int,
> | a2 int,
> | b2 int,
> | c2 int,
> | primary key(a1, b2) not enforced
> |) with (
> | 'connector' = 'values',
> | 'sink-insert-only' = 'false',
> | 'sink-changelog-mode-enforced' = 'I,UA,D'
> |)
> |""".stripMargin)
> util.verifyExplainInsert("""
> |insert into snk
> | select * from src1 join src2 on a1 = a2
> |""".stripMargin)
> } {code}
> The plan is:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2,
> b2, c2])
> +- LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], c2=[$4])
> +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
> :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
> +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
> == Optimized Physical Plan ==
> Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2,
> c2], upsertMaterialize=[true])
> +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2, c2],
> leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
> :- Exchange(distribution=[hash[a1]])
> : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
> fields=[a1, b1])
> +- Exchange(distribution=[hash[a2]])
> +- TableSourceScan(table=[[default_catalog, default_database, src2]],
> fields=[a2, b2, c2])
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2,
> c2], upsertMaterialize=[true])
> +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2, b2, c2],
> leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
> :- Exchange(distribution=[hash[a1]])
> : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
> fields=[a1, b1])
> +- Exchange(distribution=[hash[a2]])
> +- TableSourceScan(table=[[default_catalog, default_database, src2]],
> fields=[a2, b2, c2])
> {code}
> There is a `upsertMaterialize` in sink because the sink pk is `a1, b2` (`\{0,
> 3}`) and upsert key upstream `[\{2, 3}, \{0, 2, 3}]` doesn't contains it.
> However, this is an inner join and there is a equiv cond `a1 = a2`, and we
> can use this equivalence relation to enrich more upsert keys, to resolve the
> `upsertMaterialize`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)