Francisco Morillo created FLINK-39810:
-----------------------------------------

             Summary: [kinesis] Support upsert changelog streams in Kinesis SQL 
connector
                 Key: FLINK-39810
                 URL: https://issues.apache.org/jira/browse/FLINK-39810
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kinesis
    Affects Versions: aws-connector-6.0.0
            Reporter: Francisco Morillo


 Currently, the Flink SQL Kinesis connector only supports INSERT-only (append) 
streams.
  When users attempt to write the results of a GROUP BY, deduplication, or 
streaming join
  to a Kinesis table, the job fails with:

    "Table sink doesn't support consuming update and delete changes"

  This is because the KinesisDynamicSink delegates changelog mode entirely to 
the encoding
  format, which typically only supports INSERT.

  This improvement enables the kinesis connector to automatically accept upsert 
changelog
  streams when a PRIMARY KEY is defined on the table. The behavior:
  - DELETE and UPDATE_BEFORE rows are written as empty-payload tombstone records
  - The primary key fields are used as the Kinesis partition key for consistent 
shard routing
  
  No new connector or configuration options are needed — users just add
  PRIMARY KEY (col) NOT ENFORCED to their existing table definition.
  
  Example:
  \{code:sql}
  CREATE TABLE kinesis_sink (
    user_id STRING,
    total_amount BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
  ) WITH (
    'connector' = 'kinesis',
    'stream.arn' = 'arn:aws:kinesis:us-east-1:123456789:stream/myStream',
    'aws.region' = 'us-east-1',
    'format' = 'json'
  );

  -- This now works
  INSERT INTO kinesis_sink
  SELECT user_id, SUM(amount) FROM orders GROUP BY user_id;
  \{code}

  Priority: Major
  Fix Version: 6.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to