[ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28569:
--------------------------------
    Description: 
Currently 

{code}

@Test
public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
tEnv.createTemporaryFunction(
"ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());

String cdcDdl =
"CREATE TABLE users (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " primary key (user_id) not enforced\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")";

String sinkTableDdl =
"CREATE TABLE sink (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " PRIMARY KEY(email) NOT ENFORCED\n"
+ ") WITH(\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false'\n"
+ ")";
tEnv.executeSql(cdcDdl);
tEnv.executeSql(sinkTableDdl);

util.verifyJsonPlan(
"insert into sink select user_id, ndFunc(user_name), email, balance from 
users");
}

{code}

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28569
>                 URL: https://issues.apache.org/jira/browse/FLINK-28569
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: lincoln lee
>            Priority: Major
>
> Currently 
> {code}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to