[
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612149#comment-17612149
]
Jun Qin commented on FLINK-28569:
---------------------------------
Can we backport this fix into 1.14 and 1.15?
> SinkUpsertMaterializer should be aware of the input upsertKey if it is not
> empty otherwise wrong result maybe produced
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28569
> URL: https://issues.apache.org/jira/browse/FLINK-28569
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.5, 1.15.2
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete
> row in anycase, but this may cause wrong result if input has upsertKey and
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, [email protected], 10.02],
> -D[user1, Tom, [email protected], 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input
> upsertKey and update by it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)