Flaviu Cicio created FLINK-33989:
------------------------------------

             Summary: Insert Statement With Filter Operation Generates Extra 
Tombstone in Kafka
                 Key: FLINK-33989
                 URL: https://issues.apache.org/jira/browse/FLINK-33989
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.17.2
            Reporter: Flaviu Cicio


Given the following Flink SQL tables:
{code:sql}
CREATE TABLE input (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'input', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
);

CREATE TABLE output (
  id STRING NOT NULL, 
  current_value STRING NOT NULL, 
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka', 'topic' = 'output', 
  'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 
  'properties.group.id' = 'your_group_id', 
  'value.format' = 'json'
); {code}
And, the following entries are present in the input Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
If we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input; {code}
The following entries are published to the output Kafka topic:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
 

But, if we execute the following statement:
{code:sql}
INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code}
The following entries are published:
{code:json}
[
  {
    "id": "1",
    "current_value": "abc"
  },
  null,
  {
    "id": "1",
    "current_value": "abcd"
  }
]{code}
We would expect the result to be the same for both insert statements.

As we can see, there is an extra tombstone generated as a result of the second 
statement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to