[
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flaviu Cicio updated FLINK-33989:
---------------------------------
Summary: Insert Statement With Filter Operation Generates Extra Tombstone
using Upsert Kafka Connector (was: Insert Statement With Filter Operation
Generates Extra Tombstone in Kafka)
> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert
> Kafka Connector
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.2
> Reporter: Flaviu Cicio
> Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
> id STRING NOT NULL,
> current_value STRING NOT NULL,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka', 'topic' = 'input',
> 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092',
> 'properties.group.id' = 'your_group_id',
> 'value.format' = 'json'
> );
> CREATE TABLE output (
> id STRING NOT NULL,
> current_value STRING NOT NULL,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka', 'topic' = 'output',
> 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092',
> 'properties.group.id' = 'your_group_id',
> 'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
>
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1');
> {code}
> The following entries are published:
> {code:json}
> [
> {
> "id": "1",
> "current_value": "abc"
> },
> null,
> {
> "id": "1",
> "current_value": "abcd"
> }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the
> second statement.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)