[
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813995#comment-17813995
]
Jeyhun Karimov commented on FLINK-33989:
----------------------------------------
Hi [~flaviu.cicio], with the latest version of {{flink-connector-kafka}} I was
not able to reproduce the issue. In both cases, I am getting
{code:java}
{"id":1,"comment":"abc"}
{"id":1,"comment":"abcd"}
{code}
as an output. Could you please verify?
> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert
> Kafka Connector
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Table SQL / Runtime
> Affects Versions: 1.17.2
> Reporter: Flaviu Cicio
> Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
> id STRING NOT NULL,
> current_value STRING NOT NULL,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'input',
> 'key.format' = 'raw',
> 'properties.bootstrap.servers' = 'kafka:29092',
> 'properties.group.id' = 'your_group_id',
> 'value.format' = 'json'
> );
> CREATE TABLE output (
> id STRING NOT NULL,
> current_value STRING NOT NULL,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'output',
> 'key.format' = 'raw',
> 'properties.bootstrap.servers' = 'kafka:29092',
> 'properties.group.id' = 'your_group_id',
> 'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1');
> {code}
> The following entries are published:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> null,
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the
> second statement.
>
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
> @Test
> public void testAggregateFilterOmit() throws Exception {
> String topic = COUNT_FILTER_TOPIC + "_" + format;
> createTestTopic(topic, 1, 1);
> env.setParallelism(1);
> // ------------- test ---------------
> countFilterToUpsertKafkaOmitUpdateBefore(topic);
> // ------------- clean up ---------------
> deleteTestTopic(topic);
> }
> private void countFilterToUpsertKafkaOmitUpdateBefore(String table)
> throws Exception {
> String bootstraps = getBootstrapServers();
> List<Row> data =
> Arrays.asList(
> Row.of(1, "Hi"),
> Row.of(1, "Hello"),
> Row.of(2, "Hello world"),
> Row.of(2, "Hello world, how are you?"),
> Row.of(2, "I am fine."),
> Row.of(3, "Luke Skywalker"),
> Row.of(3, "Comment#1"),
> Row.of(3, "Comment#2"),
> Row.of(4, "Comment#3"),
> Row.of(4, null));
> final String createSource =
> String.format(
> "CREATE TABLE aggfilter_%s ("
> + " `id` INT,\n"
> + " `comment` STRING\n"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'data-id' = '%s'"
> + ")",
> format, TestValuesTableFactory.registerData(data));
> tEnv.executeSql(createSource);
> final String createSinkTable =
> String.format(
> "CREATE TABLE %s (\n"
> + " `id` INT,\n"
> + " `comment` STRING,\n"
> + " PRIMARY KEY (`id`) NOT ENFORCED\n"
> + ") WITH (\n"
> + " 'connector' = 'upsert-kafka',\n"
> + " 'topic' = '%s',\n"
> + " 'properties.bootstrap.servers' = '%s',\n"
> + " 'key.format' = '%s',\n"
> + " 'value.format' = '%s'"
> //+ " 'sink.omit-row-kind' = '-U'"
> + ")",
> table, table, bootstraps, format, format);
> tEnv.executeSql(createSinkTable);
> String initialValues =
> "INSERT INTO "
> + table
> + " "
> + "SELECT * "
> + "FROM aggfilter_"
> + format
> + " "
> + "WHERE id > 2";
> tEnv.executeSql(initialValues).await();
> // ---------- read from the upsert sink -------------------
> final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " +
> table), 3);
> List<String> actual =
> TestValuesTableFactory.getResults(String.format("%s", table));
> List<Row> expected =
> Arrays.asList(
> changelogRow("+I", 1L, 1L),
> changelogRow("+I", 2L, 1L),
> changelogRow("-D", 2L, 1L),
> changelogRow("+I", 2L, 2L),
> changelogRow("+I", 3L, 1L),
> changelogRow("-D", 3L, 1L),
> changelogRow("+I", 3L, 2L),
> changelogRow("-D", 3L, 2L),
> changelogRow("+I", 3L, 3L),
> changelogRow("+I", 4L, 1L),
> changelogRow("-D", 4L, 1L),
> changelogRow("+I", 4L, 2L),
> changelogRow("-D", 4L, 2L),
> changelogRow("+I", 4L, 3L),
> changelogRow("-D", 4L, 3L));
> assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)