[ 
https://issues.apache.org/jira/browse/FLINK-38579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38579:
-----------------------------------
    Labels: pull-request-available  (was: )

> NonEquivCond in join and Filters pushed down in source should also affect the 
> upstream changelog just like Filter in Calc
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38579
>                 URL: https://issues.apache.org/jira/browse/FLINK-38579
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.1, 2.1.1
>            Reporter: Xuyang Zhong
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we have checked the filter in calc that if the calc applies on non 
> upsert key, it requires the upstream changelog to be UA + UB. 
> However, non-equiv conditions in join and filters that are pushed down in 
> source should also check this logic to avoid generating DropUpdateBefore.
> For example:
> {code:java}
> @Test
> def test(): Unit = {
>   val testDataId = TestValuesTableFactory.registerData(
>     Seq(
>       changelogRow("+I", Int.box(1), "tom", Int.box(1)),
>       changelogRow("-U", Int.box(1), "tom", Int.box(1)),
>       changelogRow("+U", Int.box(1), "tom", Int.box(2))
>     ))
>   val ddl =
>     s"""
>        |CREATE TABLE t (
>        |  a int primary key not enforced,
>        |  b varchar,
>        |  c int
>        |) WITH (
>        |  'connector' = 'values',
>        |  'data-id' = '$testDataId',
>        |  'changelog-mode' = 'I,UA,UB,D',
>        |  'filterable-fields' = 'c'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(ddl)
>   val sink =
>     s"""
>        |CREATE TABLE s (
>        |  a int primary key not enforced,
>        |  b varchar,
>        |  c int
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'sink-changelog-mode-enforced' = 'I,UA,D'
>        |)
>        |""".stripMargin
>   tEnv.executeSql(sink)
>   val sql = "insert into s select * from t where c < 2"
>   println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
>   tEnv.executeSql(sql).await()       
>   // the result should be 'empty' instead of '1,tom,1'
>   println(TestValuesTableFactory.getResultsAsStrings("s").sorted) 
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to