[
https://issues.apache.org/jira/browse/FLINK-38579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38579:
-----------------------------------
Labels: pull-request-available (was: )
> NonEquivCond in join and Filters pushed down in source should also affect the
> upstream changelog just like Filter in Calc
> -------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38579
> URL: https://issues.apache.org/jira/browse/FLINK-38579
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.1, 2.1.1
> Reporter: Xuyang Zhong
> Priority: Major
> Labels: pull-request-available
>
> Currently, we have checked the filter in calc that if the calc applies on non
> upsert key, it requires the upstream changelog to be UA + UB.
> However, non-equiv conditions in join and filters that are pushed down in
> source should also check this logic to avoid generating DropUpdateBefore.
> For example:
> {code:java}
> @Test
> def test(): Unit = {
> val testDataId = TestValuesTableFactory.registerData(
> Seq(
> changelogRow("+I", Int.box(1), "tom", Int.box(1)),
> changelogRow("-U", Int.box(1), "tom", Int.box(1)),
> changelogRow("+U", Int.box(1), "tom", Int.box(2))
> ))
> val ddl =
> s"""
> |CREATE TABLE t (
> | a int primary key not enforced,
> | b varchar,
> | c int
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$testDataId',
> | 'changelog-mode' = 'I,UA,UB,D',
> | 'filterable-fields' = 'c'
> |)
> |""".stripMargin
> tEnv.executeSql(ddl)
> val sink =
> s"""
> |CREATE TABLE s (
> | a int primary key not enforced,
> | b varchar,
> | c int
> |) WITH (
> | 'connector' = 'values',
> | 'sink-insert-only' = 'false',
> | 'sink-changelog-mode-enforced' = 'I,UA,D'
> |)
> |""".stripMargin
> tEnv.executeSql(sink)
> val sql = "insert into s select * from t where c < 2"
> println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
> tEnv.executeSql(sql).await()
> // the result should be 'empty' instead of '1,tom,1'
> println(TestValuesTableFactory.getResultsAsStrings("s").sorted)
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)