Xuyang Zhong created FLINK-38753:
------------------------------------

             Summary: Enrich more upsert keys by equiv expressions
                 Key: FLINK-38753
                 URL: https://issues.apache.org/jira/browse/FLINK-38753
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
    Affects Versions: 2.2.0
            Reporter: Xuyang Zhong


Take the following sql as example
{code:java}
@Test
def test3(): Unit = {
  tEnv.executeSql(s"""
                     |create temporary table src1 (
                     |  a1 int primary key not enforced,
                     |  b1 int
                     |) with (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'I,UA,UB,D'
                     |)
                     |""".stripMargin)
  tEnv.executeSql(s"""
                     |create temporary table src2 (
                     |  a2 int,
                     |  b2 int,
                     |  c2 int,
                     |  primary key(a2, b2) not enforced
                     |) with (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'I,UA,UB,D'
                     |)
                     |""".stripMargin)
  tEnv.executeSql(s"""
                     |create temporary table snk (
                     |  a1 int,
                     |  b1 int,
                     |  a2 int,
                     |  b2 int,
                     |  c2 int,
                     |  primary key(a1, b2) not enforced
                     |) with (
                     |  'connector' = 'values',
                     |  'sink-insert-only' = 'false',
                     |  'sink-changelog-mode-enforced' = 'I,UA,D'
                     |)
                     |""".stripMargin)

  util.verifyExplainInsert("""
                             |insert into snk
                             | select * from src1 join src2 on a1 = a2
                             |""".stripMargin)

} {code}
The plan is:
{code:java}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, 
b2, c2])
+- LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], c2=[$4])
   +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, c2], 
upsertMaterialize=[true])
+- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2, c2], 
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
   :- Exchange(distribution=[hash[a1]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a1, b1])
   +- Exchange(distribution=[hash[a2]])
      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[a2, b2, c2])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, c2], 
upsertMaterialize=[true])
+- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2, b2, c2], 
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
   :- Exchange(distribution=[hash[a1]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a1, b1])
   +- Exchange(distribution=[hash[a2]])
      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[a2, b2, c2])
 {code}
There is a `upsertMaterialize` in sink because the sink pk is `a1, b2` (`\{0, 
3}`) and upsert key upstream `[\{2, 3}, \{0, 2, 3}]` doesn't contains it.

However, this is an inner join and there is a equiv cond `a1 = a2`, and we can 
use this equivalence relation to enrich more upsert keys, to resolve the 
`upsertMaterialize`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to