[ 
https://issues.apache.org/jira/browse/FLINK-38579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuyang Zhong updated FLINK-38579:
---------------------------------
    Description: 
Currently, we have checked the filter in calc that if the calc applies on non 
upsert key, it requires the upstream changelog to be UA + UB. 

However, non-equiv conditions in join and filters that are pushed down in 
source should also check this logic to avoid generating DropUpdateBefore.

For example:
{code:java}
@Test
def test(): Unit = {
  val testDataId = TestValuesTableFactory.registerData(
    Seq(
      changelogRow("+I", Int.box(1), "tom", Int.box(1)),
      changelogRow("-U", Int.box(1), "tom", Int.box(1)),
      changelogRow("+U", Int.box(1), "tom", Int.box(2))
    ))
  val ddl =
    s"""
       |CREATE TABLE t (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$testDataId',
       |  'changelog-mode' = 'I,UA,UB,D',
       |  'filterable-fields' = 'c'
       |)
       |""".stripMargin
  tEnv.executeSql(ddl)
  val sink =
    s"""
       |CREATE TABLE s (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'sink-changelog-mode-enforced' = 'I,UA,D'
       |)
       |""".stripMargin
  tEnv.executeSql(sink)

  val sql = "insert into s select * from t where c < 2"
  println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
  tEnv.executeSql(sql).await()       

  // the result should be 'empty' instead of '1,tom,1'
  println(TestValuesTableFactory.getResultsAsStrings("s").sorted) 
} {code}

  was:
Currently, we have checked the filter in calc that if the calc applies on non 
upsert key, it requires the upstream changelog to be UA + UB. 

However, non-equiv conditions in join and filters that are pushed down in 
source should also check this logic to avoid generating drop update before.

For example:
{code:java}
@Test
def test(): Unit = {
  val testDataId = TestValuesTableFactory.registerData(
    Seq(
      changelogRow("+I", Int.box(1), "tom", Int.box(1)),
      changelogRow("-U", Int.box(1), "tom", Int.box(1)),
      changelogRow("+U", Int.box(1), "tom", Int.box(2))
    ))
  val ddl =
    s"""
       |CREATE TABLE t (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$testDataId',
       |  'changelog-mode' = 'I,UA,UB,D',
       |  'filterable-fields' = 'c'
       |)
       |""".stripMargin
  tEnv.executeSql(ddl)
  val sink =
    s"""
       |CREATE TABLE s (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'sink-changelog-mode-enforced' = 'I,UA,D'
       |)
       |""".stripMargin
  tEnv.executeSql(sink)

  val sql = "insert into s select * from t where c < 2"
  println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
  tEnv.executeSql(sql).await()       

  // the result should be 'empty' instead of '1,tom,1'
  println(TestValuesTableFactory.getResultsAsStrings("s").sorted) 
} {code}


> NonEquivCond in join and Filters pushed down in source should also affect the 
> upstream changelog just like Filter in Calc
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38579
>                 URL: https://issues.apache.org/jira/browse/FLINK-38579
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Xuyang Zhong
>            Priority: Major
>
> Currently, we have checked the filter in calc that if the calc applies on non 
> upsert key, it requires the upstream changelog to be UA + UB. 
> However, non-equiv conditions in join and filters that are pushed down in 
> source should also check this logic to avoid generating DropUpdateBefore.
> For example:
> {code:java}
> @Test
> def test(): Unit = {
>   val testDataId = TestValuesTableFactory.registerData(
>     Seq(
>       changelogRow("+I", Int.box(1), "tom", Int.box(1)),
>       changelogRow("-U", Int.box(1), "tom", Int.box(1)),
>       changelogRow("+U", Int.box(1), "tom", Int.box(2))
>     ))
>   val ddl =
>     s"""
>        |CREATE TABLE t (
>        |  a int primary key not enforced,
>        |  b varchar,
>        |  c int
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$testDataId',
>        |  'changelog-mode' = 'I,UA,UB,D',
>        |  'filterable-fields' = 'c'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(ddl)
>   val sink =
>     s"""
>        |CREATE TABLE s (
>        |  a int primary key not enforced,
>        |  b varchar,
>        |  c int
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'sink-changelog-mode-enforced' = 'I,UA,D'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(sink)
>   val sql = "insert into s select * from t where c < 2"
>   println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
>   tEnv.executeSql(sql).await()       
>   // the result should be 'empty' instead of '1,tom,1'
>   println(TestValuesTableFactory.getResultsAsStrings("s").sorted) 
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to