[ https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flaviu Cicio updated FLINK-33989: --------------------------------- Description: Given the following Flink SQL tables: {code:sql} CREATE TABLE input ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'input', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); CREATE TABLE output ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'output', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); {code} And, the following entries are present in the input Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} If we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input; {code} The following entries are published to the output Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} But, if we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code} The following entries are published: {code:json} [ { "id": "1", "current_value": "abc" }, null, { "id": "1", "current_value": "abcd" } ]{code} We would expect the result to be the same for both insert statements. As we can see, there is an extra tombstone generated as a result of the second statement. Moreover, if we make a select on the input table: {code:sql} SELECT * FROM input; {code} We will get the following entries: ||op||id||current_value|| |I|1|abc| |-U|1|abc| |+U|1|abcd| We expected to see only the insert and the update_after entries. The update_before is added at DeduplicateFunctionHelper#122. This is easily reproducible with this test that we added in the UpsertKafkaTableITCase from flink-connector-kafka: {code:java} @Test public void testAggregateFilterOmit() throws Exception { String topic = COUNT_FILTER_TOPIC + "_" + format; createTestTopic(topic, 1, 1); env.setParallelism(1); // ------------- test --------------- countFilterToUpsertKafkaOmitUpdateBefore(topic); // ------------- clean up --------------- deleteTestTopic(topic); } private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws Exception { String bootstraps = getBootstrapServers(); List<Row> data = Arrays.asList( Row.of(1, "Hi"), Row.of(1, "Hello"), Row.of(2, "Hello world"), Row.of(2, "Hello world, how are you?"), Row.of(2, "I am fine."), Row.of(3, "Luke Skywalker"), Row.of(3, "Comment#1"), Row.of(3, "Comment#2"), Row.of(4, "Comment#3"), Row.of(4, null)); final String createSource = String.format( "CREATE TABLE aggfilter_%s (" + " `id` INT,\n" + " `comment` STRING\n" + ") WITH (" + " 'connector' = 'values'," + " 'data-id' = '%s'" + ")", format, TestValuesTableFactory.registerData(data)); tEnv.executeSql(createSource); final String createSinkTable = String.format( "CREATE TABLE %s (\n" + " `id` INT,\n" + " `comment` STRING,\n" + " PRIMARY KEY (`id`) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'upsert-kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'key.format' = '%s',\n" + " 'value.format' = '%s'" //+ " 'sink.omit-row-kind' = '-U'" + ")", table, table, bootstraps, format, format); tEnv.executeSql(createSinkTable); String initialValues = "INSERT INTO " + table + " " + "SELECT * " + "FROM aggfilter_" + format + " " + "WHERE id > 2"; tEnv.executeSql(initialValues).await(); // ---------- read from the upsert sink ------------------- final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + table), 3); List<String> actual = TestValuesTableFactory.getResults(String.format("%s", table)); List<Row> expected = Arrays.asList( changelogRow("+I", 1L, 1L), changelogRow("+I", 2L, 1L), changelogRow("-D", 2L, 1L), changelogRow("+I", 2L, 2L), changelogRow("+I", 3L, 1L), changelogRow("-D", 3L, 1L), changelogRow("+I", 3L, 2L), changelogRow("-D", 3L, 2L), changelogRow("+I", 3L, 3L), changelogRow("+I", 4L, 1L), changelogRow("-D", 4L, 1L), changelogRow("+I", 4L, 2L), changelogRow("-D", 4L, 2L), changelogRow("+I", 4L, 3L), changelogRow("-D", 4L, 3L)); assertThat(result).satisfies(matching(deepEqualTo(expected, true))); } {code} was: Given the following Flink SQL tables: {code:sql} CREATE TABLE input ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'input', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); CREATE TABLE output ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'output', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); {code} And, the following entries are present in the input Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} If we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input; {code} The following entries are published to the output Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} But, if we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code} The following entries are published: {code:json} [ { "id": "1", "current_value": "abc" }, null, { "id": "1", "current_value": "abcd" } ]{code} We would expect the result to be the same for both insert statements. As we can see, there is an extra tombstone generated as a result of the second statement. Moreover, if we make a select on the input table: {code:sql} SELECT * FROM input; {code} We will get the following entries: ||op||id||current_value|| |I|1|abc| |-U|1|abc| |+U|1|abcd| We expected to see only the insert and the update_after entries. The update_before is added at DeduplicateFunctionHelper#122. This is easily reproducible with this test that we added in the UpsertKafkaTableITCase from flink-connector-kafka: {code:java} @Test public void testAggregateFilterOmit() throws Exception { String topic = COUNT_FILTER_TOPIC + "_" + format; createTestTopic(topic, 1, 1); env.setParallelism(1); // ------------- test --------------- countFilterToUpsertKafkaOmitUpdateBefore(topic); // ------------- clean up --------------- deleteTestTopic(topic); } private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws Exception { String bootstraps = getBootstrapServers(); List<Row> data = Arrays.asList( Row.of(1, "Hi"), Row.of(1, "Hello"), Row.of(2, "Hello world"), Row.of(2, "Hello world, how are you?"), Row.of(2, "I am fine."), Row.of(3, "Luke Skywalker"), Row.of(3, "Comment#1"), Row.of(3, "Comment#2"), Row.of(4, "Comment#3"), Row.of(4, null)); final String createSource = String.format( "CREATE TABLE aggfilter_%s (" + " `id` INT,\n" + " `comment` STRING\n" + ") WITH (" + " 'connector' = 'values'," + " 'data-id' = '%s'" + ")", format, TestValuesTableFactory.registerData(data)); tEnv.executeSql(createSource); final String createSinkTable = String.format( "CREATE TABLE %s (\n" + " `id` INT,\n" + " `comment` STRING,\n" + " PRIMARY KEY (`id`) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'upsert-kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'key.format' = '%s',\n" + " 'value.format' = '%s'" //+ " 'sink.omit-row-kind' = '-U'" + ")", table, table, bootstraps, format, format); tEnv.executeSql(createSinkTable); String initialValues = "INSERT INTO " + table + " " + "SELECT * " + "FROM aggfilter_" + format + " " + "WHERE id > 2"; tEnv.executeSql(initialValues).await(); // ---------- read from the upsert sink ------------------- final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + table), 3); List<String> actual = TestValuesTableFactory.getResults(String.format("%s", table)); List<Row> expected = Arrays.asList( changelogRow("+I", 1L, 1L), changelogRow("+I", 2L, 1L), changelogRow("-D", 2L, 1L), changelogRow("+I", 2L, 2L), changelogRow("+I", 3L, 1L), changelogRow("-D", 3L, 1L), changelogRow("+I", 3L, 2L), changelogRow("-D", 3L, 2L), changelogRow("+I", 3L, 3L), changelogRow("+I", 4L, 1L), changelogRow("-D", 4L, 1L), changelogRow("+I", 4L, 2L), changelogRow("-D", 4L, 2L), changelogRow("+I", 4L, 3L), changelogRow("-D", 4L, 3L)); assertThat(result).satisfies(matching(deepEqualTo(expected, true))); } {code} > Insert Statement With Filter Operation Generates Extra Tombstone using Upsert > Kafka Connector > --------------------------------------------------------------------------------------------- > > Key: FLINK-33989 > URL: https://issues.apache.org/jira/browse/FLINK-33989 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Runtime > Affects Versions: 1.17.2 > Reporter: Flaviu Cicio > Priority: Major > > Given the following Flink SQL tables: > {code:sql} > CREATE TABLE input ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'input', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); > CREATE TABLE output ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'output', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); {code} > And, the following entries are present in the input Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > If we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input; {code} > The following entries are published to the output Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > But, if we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); > {code} > The following entries are published: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > null, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > We would expect the result to be the same for both insert statements. > As we can see, there is an extra tombstone generated as a result of the > second statement. > > Moreover, if we make a select on the input table: > {code:sql} > SELECT * FROM input; > {code} > We will get the following entries: > ||op||id||current_value|| > |I|1|abc| > |-U|1|abc| > |+U|1|abcd| > We expected to see only the insert and the update_after entries. > The update_before is added at DeduplicateFunctionHelper#122. > This is easily reproducible with this test that we added in the > UpsertKafkaTableITCase from flink-connector-kafka: > {code:java} > @Test > public void testAggregateFilterOmit() throws Exception { > String topic = COUNT_FILTER_TOPIC + "_" + format; > createTestTopic(topic, 1, 1); > env.setParallelism(1); > // ------------- test --------------- > countFilterToUpsertKafkaOmitUpdateBefore(topic); > // ------------- clean up --------------- > deleteTestTopic(topic); > } > private void countFilterToUpsertKafkaOmitUpdateBefore(String table) > throws Exception { > String bootstraps = getBootstrapServers(); > List<Row> data = > Arrays.asList( > Row.of(1, "Hi"), > Row.of(1, "Hello"), > Row.of(2, "Hello world"), > Row.of(2, "Hello world, how are you?"), > Row.of(2, "I am fine."), > Row.of(3, "Luke Skywalker"), > Row.of(3, "Comment#1"), > Row.of(3, "Comment#2"), > Row.of(4, "Comment#3"), > Row.of(4, null)); > final String createSource = > String.format( > "CREATE TABLE aggfilter_%s (" > + " `id` INT,\n" > + " `comment` STRING\n" > + ") WITH (" > + " 'connector' = 'values'," > + " 'data-id' = '%s'" > + ")", > format, TestValuesTableFactory.registerData(data)); > tEnv.executeSql(createSource); > final String createSinkTable = > String.format( > "CREATE TABLE %s (\n" > + " `id` INT,\n" > + " `comment` STRING,\n" > + " PRIMARY KEY (`id`) NOT ENFORCED\n" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'value.format' = '%s'" > //+ " 'sink.omit-row-kind' = '-U'" > + ")", > table, table, bootstraps, format, format); > tEnv.executeSql(createSinkTable); > String initialValues = > "INSERT INTO " > + table > + " " > + "SELECT * " > + "FROM aggfilter_" > + format > + " " > + "WHERE id > 2"; > tEnv.executeSql(initialValues).await(); > // ---------- read from the upsert sink ------------------- > final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + > table), 3); > List<String> actual = > TestValuesTableFactory.getResults(String.format("%s", table)); > List<Row> expected = > Arrays.asList( > changelogRow("+I", 1L, 1L), > changelogRow("+I", 2L, 1L), > changelogRow("-D", 2L, 1L), > changelogRow("+I", 2L, 2L), > changelogRow("+I", 3L, 1L), > changelogRow("-D", 3L, 1L), > changelogRow("+I", 3L, 2L), > changelogRow("-D", 3L, 2L), > changelogRow("+I", 3L, 3L), > changelogRow("+I", 4L, 1L), > changelogRow("-D", 4L, 1L), > changelogRow("+I", 4L, 2L), > changelogRow("-D", 4L, 2L), > changelogRow("+I", 4L, 3L), > changelogRow("-D", 4L, 3L)); > assertThat(result).satisfies(matching(deepEqualTo(expected, true))); > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)