[ 
https://issues.apache.org/jira/browse/FLINK-39810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085214#comment-18085214
 ] 

Francisco Morillo commented on FLINK-39810:
-------------------------------------------

I know its different setup than the kafka dedicated connector, but thought it 
makes more sense to have the feature within the same connector and not 
something separate as many applications in SQL would create upsert streams.  
Should i create a FLIP?

> [kinesis] Support upsert changelog streams in Kinesis SQL connector
> -------------------------------------------------------------------
>
>                 Key: FLINK-39810
>                 URL: https://issues.apache.org/jira/browse/FLINK-39810
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-6.0.0
>            Reporter: Francisco Morillo
>            Priority: Major
>              Labels: pull-request-available
>
>  Currently, the Flink SQL Kinesis connector only supports INSERT-only 
> (append) streams.
>   When users attempt to write the results of a GROUP BY, deduplication, or 
> streaming join
>   to a Kinesis table, the job fails with:
>     "Table sink doesn't support consuming update and delete changes"
>   This is because the KinesisDynamicSink delegates changelog mode entirely to 
> the encoding
>   format, which typically only supports INSERT.
>   This improvement enables the kinesis connector to automatically accept 
> upsert changelog
>   streams when a PRIMARY KEY is defined on the table. The behavior:
>   - DELETE and UPDATE_BEFORE rows are written as empty-payload tombstone 
> records
>   - The primary key fields are used as the Kinesis partition key for 
> consistent shard routing
>   
>   No new connector or configuration options are needed — users just add
>   PRIMARY KEY (col) NOT ENFORCED to their existing table definition.
>   
>   Example:
>   \{code:sql}
>   CREATE TABLE kinesis_sink (
>     user_id STRING,
>     total_amount BIGINT,
>     PRIMARY KEY (user_id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'kinesis',
>     'stream.arn' = 'arn:aws:kinesis:us-east-1:123456789:stream/myStream',
>     'aws.region' = 'us-east-1',
>     'format' = 'json'
>   );
>   -- This now works
>   INSERT INTO kinesis_sink
>   SELECT user_id, SUM(amount) FROM orders GROUP BY user_id;
>   \{code}
>   Priority: Major
>   Fix Version: 6.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to