[
https://issues.apache.org/jira/browse/FLINK-33608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiabao Sun updated FLINK-33608:
-------------------------------
External issue ID: (was: FLINK-27527)
> UpsertTestDynamicTableSink's keySerializationSchama should extract and
> serialize the primary key fields from RowData
> --------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33608
> URL: https://issues.apache.org/jira/browse/FLINK-33608
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Common
> Affects Versions: 1.18.0
> Reporter: Jiabao Sun
> Priority: Major
> Labels: pull-request-available
>
> UpsertTestDynamicTableSink's keySerializationSchama should extract and
> serialize the primary key fields from RowData.
> {code:sql}
> CREATE TABLE UpsertFileSinkTable (
> user_id INT,
> user_name STRING,
> user_count BIGINT,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-files',
> 'key.format' = 'json',
> 'value.format' = 'json',
> 'output-filepath' = '..'
> );
> INSERT INTO UpsertFileSinkTable
> SELECT user_id, user_name, COUNT(*) AS user_count
> FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'),
> (1, 'Bob'))
> AS UserCountTable(user_id, user_name)
> GROUP BY user_id, user_name;
> {code}
> Results:
> || Key || Value ||
> | {"user_id":1,"user_name":"Bob","user_count":2} |
> {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22,"user_name":"Tom","user_count":1} |
> {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42,"user_name":"Kim","user_count":3} |
> {"user_id":42,"user_name":"Kim","user_count":3} |
> Expected:
> | {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} |
--
This message was sent by Atlassian Jira
(v8.20.10#820010)