[
https://issues.apache.org/jira/browse/FLINK-33608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jiabao Sun updated FLINK-33608:
-------------------------------
Description:
UpsertTestDynamicTableSink's keySerializationSchama should extract and
serialize the primary key fields from RowData.
{code:sql}
CREATE TABLE UpsertFileSinkTable (
user_id INT,
user_name STRING,
user_count BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-files',
'key.format' = 'json',
'value.format' = 'json',
'output-filepath' = '..'
);
INSERT INTO UpsertFileSinkTable
SELECT user_id, user_name, COUNT(*) AS user_count
FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'),
(1, 'Bob'))
AS UserCountTable(user_id, user_name)
GROUP BY user_id, user_name;
{code}
Results:
|| Key || Value ||
| {"user_id":1,"user_name":"Bob","user_count":2} |
{"user_id":1,"user_name":"Bob","user_count":2} |
| {"user_id":22,"user_name":"Tom","user_count":1} |
{"user_id":22,"user_name":"Tom","user_count":1} |
| {"user_id":42,"user_name":"Kim","user_count":3} |
{"user_id":42,"user_name":"Kim","user_count":3} |
Expected:
| {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} |
| {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} |
| {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} |
was:UpsertTestDynamicTableSink's keySerializationSchama should extract and
serialize the primary key fields from RowData.
> UpsertTestDynamicTableSink's keySerializationSchama should extract and
> serialize the primary key fields from RowData
> --------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33608
> URL: https://issues.apache.org/jira/browse/FLINK-33608
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Common
> Affects Versions: 1.18.0
> Reporter: Jiabao Sun
> Priority: Major
> Labels: pull-request-available
>
> UpsertTestDynamicTableSink's keySerializationSchama should extract and
> serialize the primary key fields from RowData.
> {code:sql}
> CREATE TABLE UpsertFileSinkTable (
> user_id INT,
> user_name STRING,
> user_count BIGINT,
> PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-files',
> 'key.format' = 'json',
> 'value.format' = 'json',
> 'output-filepath' = '..'
> );
> INSERT INTO UpsertFileSinkTable
> SELECT user_id, user_name, COUNT(*) AS user_count
> FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'),
> (1, 'Bob'))
> AS UserCountTable(user_id, user_name)
> GROUP BY user_id, user_name;
> {code}
> Results:
> || Key || Value ||
> | {"user_id":1,"user_name":"Bob","user_count":2} |
> {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22,"user_name":"Tom","user_count":1} |
> {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42,"user_name":"Kim","user_count":3} |
> {"user_id":42,"user_name":"Kim","user_count":3} |
> Expected:
> | {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} |
--
This message was sent by Atlassian Jira
(v8.20.10#820010)