Francisco Morillo created FLINK-39810:
-----------------------------------------
Summary: [kinesis] Support upsert changelog streams in Kinesis SQL
connector
Key: FLINK-39810
URL: https://issues.apache.org/jira/browse/FLINK-39810
Project: Flink
Issue Type: Improvement
Components: Connectors / Kinesis
Affects Versions: aws-connector-6.0.0
Reporter: Francisco Morillo
Currently, the Flink SQL Kinesis connector only supports INSERT-only (append)
streams.
When users attempt to write the results of a GROUP BY, deduplication, or
streaming join
to a Kinesis table, the job fails with:
"Table sink doesn't support consuming update and delete changes"
This is because the KinesisDynamicSink delegates changelog mode entirely to
the encoding
format, which typically only supports INSERT.
This improvement enables the kinesis connector to automatically accept upsert
changelog
streams when a PRIMARY KEY is defined on the table. The behavior:
- DELETE and UPDATE_BEFORE rows are written as empty-payload tombstone records
- The primary key fields are used as the Kinesis partition key for consistent
shard routing
No new connector or configuration options are needed — users just add
PRIMARY KEY (col) NOT ENFORCED to their existing table definition.
Example:
\{code:sql}
CREATE TABLE kinesis_sink (
user_id STRING,
total_amount BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kinesis',
'stream.arn' = 'arn:aws:kinesis:us-east-1:123456789:stream/myStream',
'aws.region' = 'us-east-1',
'format' = 'json'
);
-- This now works
INSERT INTO kinesis_sink
SELECT user_id, SUM(amount) FROM orders GROUP BY user_id;
\{code}
Priority: Major
Fix Version: 6.0.0
--
This message was sent by Atlassian Jira
(v8.20.10#820010)