Flaviu Cicio created FLINK-33989:
------------------------------------
Summary: Insert Statement With Filter Operation Generates Extra
Tombstone in Kafka
Key: FLINK-33989
URL: https://issues.apache.org/jira/browse/FLINK-33989
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.17.2
Reporter: Flaviu Cicio
Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
id STRING NOT NULL,
current_value STRING NOT NULL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka', 'topic' = 'input',
'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092',
'properties.group.id' = 'your_group_id',
'value.format' = 'json'
);
CREATE TABLE output (
id STRING NOT NULL,
current_value STRING NOT NULL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka', 'topic' = 'output',
'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092',
'properties.group.id' = 'your_group_id',
'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
{
"id": "1",
"current_value": "abc"
},
{
"id": "1",
"current_value": "abcd"
}
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
{
"id": "1",
"current_value": "abc"
},
{
"id": "1",
"current_value": "abcd"
}
]{code}
But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
{
"id": "1",
"current_value": "abc"
},
null,
{
"id": "1",
"current_value": "abcd"
}
]{code}
We would expect the result to be the same for both insert statements.
As we can see, there is an extra tombstone generated as a result of the second
statement.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)