[ 
https://issues.apache.org/jira/browse/FLINK-39810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39810:
-----------------------------------
    Labels: pull-request-available  (was: )

> [kinesis] Support upsert changelog streams in Kinesis SQL connector
> -------------------------------------------------------------------
>
>                 Key: FLINK-39810
>                 URL: https://issues.apache.org/jira/browse/FLINK-39810
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-6.0.0
>            Reporter: Francisco Morillo
>            Priority: Major
>              Labels: pull-request-available
>
>  Currently, the Flink SQL Kinesis connector only supports INSERT-only 
> (append) streams.
>   When users attempt to write the results of a GROUP BY, deduplication, or 
> streaming join
>   to a Kinesis table, the job fails with:
>     "Table sink doesn't support consuming update and delete changes"
>   This is because the KinesisDynamicSink delegates changelog mode entirely to 
> the encoding
>   format, which typically only supports INSERT.
>   This improvement enables the kinesis connector to automatically accept 
> upsert changelog
>   streams when a PRIMARY KEY is defined on the table. The behavior:
>   - DELETE and UPDATE_BEFORE rows are written as empty-payload tombstone 
> records
>   - The primary key fields are used as the Kinesis partition key for 
> consistent shard routing
>   
>   No new connector or configuration options are needed — users just add
>   PRIMARY KEY (col) NOT ENFORCED to their existing table definition.
>   
>   Example:
>   \{code:sql}
>   CREATE TABLE kinesis_sink (
>     user_id STRING,
>     total_amount BIGINT,
>     PRIMARY KEY (user_id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'kinesis',
>     'stream.arn' = 'arn:aws:kinesis:us-east-1:123456789:stream/myStream',
>     'aws.region' = 'us-east-1',
>     'format' = 'json'
>   );
>   -- This now works
>   INSERT INTO kinesis_sink
>   SELECT user_id, SUM(amount) FROM orders GROUP BY user_id;
>   \{code}
>   Priority: Major
>   Fix Version: 6.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to