[
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee updated FLINK-28569:
--------------------------------
Description:
Currently SinkUpsertMaterializer only update row by comparing the complete row
in anycase, but this may cause wrong result if input has upsertKey and also
non-deterministic column values, see such a case:
{code:java}
@Test
public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
tEnv.createTemporaryFunction(
"ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
String cdcDdl =
"CREATE TABLE users (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " primary key (user_id) not enforced\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")";
String sinkTableDdl =
"CREATE TABLE sink (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " PRIMARY KEY(email) NOT ENFORCED\n"
+ ") WITH(\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false'\n"
+ ")";
tEnv.executeSql(cdcDdl);
tEnv.executeSql(sinkTableDdl);
util.verifyJsonPlan(
"insert into sink select user_id, ndFunc(user_name), email, balance from
users");
}
{code}
was:
Currently
{code:java}
@Test
public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
tEnv.createTemporaryFunction(
"ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
String cdcDdl =
"CREATE TABLE users (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " primary key (user_id) not enforced\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")";
String sinkTableDdl =
"CREATE TABLE sink (\n"
+ " user_id STRING,\n"
+ " user_name STRING,\n"
+ " email STRING,\n"
+ " balance DECIMAL(18,2),\n"
+ " PRIMARY KEY(email) NOT ENFORCED\n"
+ ") WITH(\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false'\n"
+ ")";
tEnv.executeSql(cdcDdl);
tEnv.executeSql(sinkTableDdl);
util.verifyJsonPlan(
"insert into sink select user_id, ndFunc(user_name), email, balance from
users");
}
{code}
> SinkUpsertMaterializer should be aware of the input upsertKey if it is not
> empty otherwise wrong result maybe produced
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28569
> URL: https://issues.apache.org/jira/browse/FLINK-28569
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: lincoln lee
> Priority: Major
>
> Currently SinkUpsertMaterializer only update row by comparing the complete
> row in anycase, but this may cause wrong result if input has upsertKey and
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from
> users");
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)