[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flaviu Cicio updated FLINK-33989:
---------------------------------
    Description: 
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

 

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.

The update_before is added at DeduplicateFunctionHelper#122.

This is easily reproducible with this test that we added in the 
UpsertKafkaTableITCase from flink-connector-kafka:
{code:java}
    @Test
    public void testAggregateFilterOmit() throws Exception {
        String topic = COUNT_FILTER_TOPIC + "_" + format;
        createTestTopic(topic, 1, 1);
        env.setParallelism(1);
        // -------------   test   ---------------
        countFilterToUpsertKafkaOmitUpdateBefore(topic);
        // ------------- clean up ---------------
        deleteTestTopic(topic);
    }

    private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws 
Exception {
        String bootstraps = getBootstrapServers();
        List<Row> data =
                Arrays.asList(
                        Row.of(1, "Hi"),
                        Row.of(1, "Hello"),
                        Row.of(2, "Hello world"),
                        Row.of(2, "Hello world, how are you?"),
                        Row.of(2, "I am fine."),
                        Row.of(3, "Luke Skywalker"),
                        Row.of(3, "Comment#1"),
                        Row.of(3, "Comment#2"),
                        Row.of(4, "Comment#3"),
                        Row.of(4, null));

        final String createSource =
                String.format(
                        "CREATE TABLE aggfilter_%s ("
                                + "  `id` INT,\n"
                                + "  `comment` STRING\n"
                                + ") WITH ("
                                + "  'connector' = 'values',"
                                + "  'data-id' = '%s'"
                                + ")",
                        format, TestValuesTableFactory.registerData(data));
        tEnv.executeSql(createSource);

        final String createSinkTable =
                String.format(
                        "CREATE TABLE %s (\n"
                                + "  `id` INT,\n"
                                + "  `comment` STRING,\n"
                                + "  PRIMARY KEY (`id`) NOT ENFORCED\n"
                                + ") WITH (\n"
                                + "  'connector' = 'upsert-kafka',\n"
                                + "  'topic' = '%s',\n"
                                + "  'properties.bootstrap.servers' = '%s',\n"
                                + "  'key.format' = '%s',\n"
                                + "  'value.format' = '%s'"
                                //+ "  'sink.omit-row-kind' = '-U'"
                                + ")",
                        table, table, bootstraps, format, format);

        tEnv.executeSql(createSinkTable);

        String initialValues =
                "INSERT INTO "
                        + table
                        + " "
                        + "SELECT * "
                        + "FROM aggfilter_"
                        + format
                        + " "
                        + "WHERE id > 2";
        tEnv.executeSql(initialValues).await();

        // ---------- read from the upsert sink -------------------

        final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + 
table), 3);
        List<String> actual = 
TestValuesTableFactory.getResults(String.format("%s", table));
        List<Row> expected =
                Arrays.asList(
                        changelogRow("+I", 1L, 1L),
                        changelogRow("+I", 2L, 1L),
                        changelogRow("-D", 2L, 1L),
                        changelogRow("+I", 2L, 2L),
                        changelogRow("+I", 3L, 1L),
                        changelogRow("-D", 3L, 1L),
                        changelogRow("+I", 3L, 2L),
                        changelogRow("-D", 3L, 2L),
                        changelogRow("+I", 3L, 3L),
                        changelogRow("+I", 4L, 1L),
                        changelogRow("-D", 4L, 1L),
                        changelogRow("+I", 4L, 2L),
                        changelogRow("-D", 4L, 2L),
                        changelogRow("+I", 4L, 3L),
                        changelogRow("-D", 4L, 3L));

        assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
    }
{code}
 

 

  was:
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
 

But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.

Moreover, if we make a select on the input table:
{code:sql}
SELECT * FROM input;
{code}
We will get the following entries:
||op||id||current_value||
|I|1|abc|
|-U|1|abc|
|+U|1|abcd|

We expected to see only the insert and the update_after entries.
The update_before is added at DeduplicateFunctionHelper#122.

This is easily reproducible with this test that we added in the 
UpsertKafkaTableITCase from flink-connector-kafka:


{code:java}
    @Test
    public void testAggregateFilterOmit() throws Exception {
        String topic = COUNT_FILTER_TOPIC + "_" + format;
        createTestTopic(topic, 1, 1);
        env.setParallelism(1);
        // -------------   test   ---------------
        countFilterToUpsertKafkaOmitUpdateBefore(topic);
        // ------------- clean up ---------------
        deleteTestTopic(topic);
    }

    private void countFilterToUpsertKafkaOmitUpdateBefore(String table) throws 
Exception {
        String bootstraps = getBootstrapServers();
        List<Row> data =
                Arrays.asList(
                        Row.of(1, "Hi"),
                        Row.of(1, "Hello"),
                        Row.of(2, "Hello world"),
                        Row.of(2, "Hello world, how are you?"),
                        Row.of(2, "I am fine."),
                        Row.of(3, "Luke Skywalker"),
                        Row.of(3, "Comment#1"),
                        Row.of(3, "Comment#2"),
                        Row.of(4, "Comment#3"),
                        Row.of(4, null));

        final String createSource =
                String.format(
                        "CREATE TABLE aggfilter_%s ("
                                + "  `id` INT,\n"
                                + "  `comment` STRING\n"
                                + ") WITH ("
                                + "  'connector' = 'values',"
                                + "  'data-id' = '%s'"
                                + ")",
                        format, TestValuesTableFactory.registerData(data));
        tEnv.executeSql(createSource);

        final String createSinkTable =
                String.format(
                        "CREATE TABLE %s (\n"
                                + "  `id` INT,\n"
                                + "  `comment` STRING,\n"
                                + "  PRIMARY KEY (`id`) NOT ENFORCED\n"
                                + ") WITH (\n"
                                + "  'connector' = 'upsert-kafka',\n"
                                + "  'topic' = '%s',\n"
                                + "  'properties.bootstrap.servers' = '%s',\n"
                                + "  'key.format' = '%s',\n"
                                + "  'value.format' = '%s'"
                                //+ "  'sink.omit-row-kind' = '-U'"
                                + ")",
                        table, table, bootstraps, format, format);

        tEnv.executeSql(createSinkTable);

        String initialValues =
                "INSERT INTO "
                        + table
                        + " "
                        + "SELECT * "
                        + "FROM aggfilter_"
                        + format
                        + " "
                        + "WHERE id > 2";
        tEnv.executeSql(initialValues).await();

        // ---------- read from the upsert sink -------------------

        final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + 
table), 3);
        List<String> actual = 
TestValuesTableFactory.getResults(String.format("%s", table));
        List<Row> expected =
                Arrays.asList(
                        changelogRow("+I", 1L, 1L),
                        changelogRow("+I", 2L, 1L),
                        changelogRow("-D", 2L, 1L),
                        changelogRow("+I", 2L, 2L),
                        changelogRow("+I", 3L, 1L),
                        changelogRow("-D", 3L, 1L),
                        changelogRow("+I", 3L, 2L),
                        changelogRow("-D", 3L, 2L),
                        changelogRow("+I", 3L, 3L),
                        changelogRow("+I", 4L, 1L),
                        changelogRow("-D", 4L, 1L),
                        changelogRow("+I", 4L, 2L),
                        changelogRow("-D", 4L, 2L),
                        changelogRow("+I", 4L, 3L),
                        changelogRow("-D", 4L, 3L));

        assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
    }
{code}

 

 


> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33989
>                 URL: https://issues.apache.org/jira/browse/FLINK-33989
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Runtime
>    Affects Versions: 1.17.2
>            Reporter: Flaviu Cicio
>            Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 'topic' = 'input', 
>   'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 'topic' = 'output', 
>   'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); 
> {code}
> The following entries are published:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   null,
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the 
> second statement.
>  
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the 
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
>     @Test
>     public void testAggregateFilterOmit() throws Exception {
>         String topic = COUNT_FILTER_TOPIC + "_" + format;
>         createTestTopic(topic, 1, 1);
>         env.setParallelism(1);
>         // -------------   test   ---------------
>         countFilterToUpsertKafkaOmitUpdateBefore(topic);
>         // ------------- clean up ---------------
>         deleteTestTopic(topic);
>     }
>     private void countFilterToUpsertKafkaOmitUpdateBefore(String table) 
> throws Exception {
>         String bootstraps = getBootstrapServers();
>         List<Row> data =
>                 Arrays.asList(
>                         Row.of(1, "Hi"),
>                         Row.of(1, "Hello"),
>                         Row.of(2, "Hello world"),
>                         Row.of(2, "Hello world, how are you?"),
>                         Row.of(2, "I am fine."),
>                         Row.of(3, "Luke Skywalker"),
>                         Row.of(3, "Comment#1"),
>                         Row.of(3, "Comment#2"),
>                         Row.of(4, "Comment#3"),
>                         Row.of(4, null));
>         final String createSource =
>                 String.format(
>                         "CREATE TABLE aggfilter_%s ("
>                                 + "  `id` INT,\n"
>                                 + "  `comment` STRING\n"
>                                 + ") WITH ("
>                                 + "  'connector' = 'values',"
>                                 + "  'data-id' = '%s'"
>                                 + ")",
>                         format, TestValuesTableFactory.registerData(data));
>         tEnv.executeSql(createSource);
>         final String createSinkTable =
>                 String.format(
>                         "CREATE TABLE %s (\n"
>                                 + "  `id` INT,\n"
>                                 + "  `comment` STRING,\n"
>                                 + "  PRIMARY KEY (`id`) NOT ENFORCED\n"
>                                 + ") WITH (\n"
>                                 + "  'connector' = 'upsert-kafka',\n"
>                                 + "  'topic' = '%s',\n"
>                                 + "  'properties.bootstrap.servers' = '%s',\n"
>                                 + "  'key.format' = '%s',\n"
>                                 + "  'value.format' = '%s'"
>                                 //+ "  'sink.omit-row-kind' = '-U'"
>                                 + ")",
>                         table, table, bootstraps, format, format);
>         tEnv.executeSql(createSinkTable);
>         String initialValues =
>                 "INSERT INTO "
>                         + table
>                         + " "
>                         + "SELECT * "
>                         + "FROM aggfilter_"
>                         + format
>                         + " "
>                         + "WHERE id > 2";
>         tEnv.executeSql(initialValues).await();
>         // ---------- read from the upsert sink -------------------
>         final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + 
> table), 3);
>         List<String> actual = 
> TestValuesTableFactory.getResults(String.format("%s", table));
>         List<Row> expected =
>                 Arrays.asList(
>                         changelogRow("+I", 1L, 1L),
>                         changelogRow("+I", 2L, 1L),
>                         changelogRow("-D", 2L, 1L),
>                         changelogRow("+I", 2L, 2L),
>                         changelogRow("+I", 3L, 1L),
>                         changelogRow("-D", 3L, 1L),
>                         changelogRow("+I", 3L, 2L),
>                         changelogRow("-D", 3L, 2L),
>                         changelogRow("+I", 3L, 3L),
>                         changelogRow("+I", 4L, 1L),
>                         changelogRow("-D", 4L, 1L),
>                         changelogRow("+I", 4L, 2L),
>                         changelogRow("-D", 4L, 2L),
>                         changelogRow("+I", 4L, 3L),
>                         changelogRow("-D", 4L, 3L));
>         assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
>     }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to