[
https://issues.apache.org/jira/browse/FLINK-39810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085225#comment-18085225
]
Martijn Visser commented on FLINK-39810:
----------------------------------------
Yeah I think it should be a FLIP, because creating different experiences
between different connectors is ultimately confusing for the users.
> [kinesis] Support upsert changelog streams in Kinesis SQL connector
> -------------------------------------------------------------------
>
> Key: FLINK-39810
> URL: https://issues.apache.org/jira/browse/FLINK-39810
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kinesis
> Affects Versions: aws-connector-6.0.0
> Reporter: Francisco Morillo
> Priority: Major
> Labels: pull-request-available
>
> Currently, the Flink SQL Kinesis connector only supports INSERT-only
> (append) streams.
> When users attempt to write the results of a GROUP BY, deduplication, or
> streaming join
> to a Kinesis table, the job fails with:
> "Table sink doesn't support consuming update and delete changes"
> This is because the KinesisDynamicSink delegates changelog mode entirely to
> the encoding
> format, which typically only supports INSERT.
> This improvement enables the kinesis connector to automatically accept
> upsert changelog
> streams when a PRIMARY KEY is defined on the table. The behavior:
> - DELETE and UPDATE_BEFORE rows are written as empty-payload tombstone
> records
> - The primary key fields are used as the Kinesis partition key for
> consistent shard routing
>
> No new connector or configuration options are needed — users just add
> PRIMARY KEY (col) NOT ENFORCED to their existing table definition.
>
> Example:
> \{code:sql}
> CREATE TABLE kinesis_sink (
> user_id STRING,
> total_amount BIGINT,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kinesis',
> 'stream.arn' = 'arn:aws:kinesis:us-east-1:123456789:stream/myStream',
> 'aws.region' = 'us-east-1',
> 'format' = 'json'
> );
> -- This now works
> INSERT INTO kinesis_sink
> SELECT user_id, SUM(amount) FROM orders GROUP BY user_id;
> \{code}
> Priority: Major
> Fix Version: 6.0.0
--
This message was sent by Atlassian Jira
(v8.20.10#820010)