[ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614484#comment-17614484
 ] 

lincoln lee commented on FLINK-28569:
-------------------------------------

[~godfreyhe] I think it's valueable to port this fix to older versions(those 
users who encountered such case will get wrong result), this patch does not 
break the compatibility in some way (jobs can restore from previous state 
though new behavior changes), what do you think?

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28569
>                 URL: https://issues.apache.org/jira/browse/FLINK-28569
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.5, 1.15.2
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.17.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete 
> row in anycase, but this may cause wrong result if input has upsertKey and 
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, [email protected], 10.02],
> -D[user1, Tom, [email protected], 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of 
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input 
> upsertKey and update by it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to