[ 
https://issues.apache.org/jira/browse/FLINK-32642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jasonliangyc updated FLINK-32642:
---------------------------------
    Affects Version/s: 1.17.1
                       1.17.0

> The upsert mode doesn't work for the compound keys
> --------------------------------------------------
>
>                 Key: FLINK-32642
>                 URL: https://issues.apache.org/jira/browse/FLINK-32642
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.17.0, 1.17.1
>            Reporter: jasonliangyc
>            Priority: Major
>         Attachments: image-2023-07-21-23-55-47-399.png, 
> image-2023-07-21-23-56-56-543.png, image-2023-07-21-23-57-22-186.png, 
> image-2023-07-22-00-11-56-363.png
>
>
>  
> Hi, the issue can be produced by following below steps:
>  
> *1.* Create two tables in sqlserver, one is the sink table, the other one is 
> cdc source table, the sink table has a compound key(date_str,time_str).
> *2.* Create the corresponding flink tables in sql-client.
>  
> {code:java}
> --create sink table in sqlserver
> CREATE TABLE cumulative_cnt (
>   date_str VARCHAR(50),
>   time_str VARCHAR(50),
>   cnt INTEGER
>   CONSTRAINT PK_cumulative_cnt PRIMARY KEY (date_str,time_str)
> );
> --create source cdc table in sqlserver
> CREATE TABLE user_behavior (
>   id INTEGER NOT NULL IDENTITY(101,1) PRIMARY KEY,
>   create_date datetime NOT NULL,
>   click_event VARCHAR(255) NOT NULL
> );
> EXEC sys.sp_cdc_enable_table  
> @source_schema = N'dbo',  
> @source_name   = N'user_behavior',  
> @role_name     = NULL,  
> @supports_net_changes = 1  
> GO
> --create flink tables through sql-client
> CREATE TABLE cumulative_cnt (
>     date_str STRING,
>     time_str STRING,
>     cnt BIGINT,
>     PRIMARY KEY (date_str, time_str)NOT ENFORCED
>   )  WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:sqlserver://xxxx:1433;databaseName=xxxx',
>     'username' = 'xxxx',
>     'password' = 'xxxx',
>     'table-name' = 'cumulative_cnt'
> );
> --create flink cdc table through sql-client
> CREATE TABLE user_behavior (
>    id int,
>    create_date TIMESTAMP(0),
>    click_event STRING
>  ) WITH (
>     'connector' = 'sqlserver-cdc',
>     'hostname' = 'xxxx',
>     'port' = '1433',
>     'username' = 'xxxx',
>     'password' = 'xxxx',
>     'database-name' = 'xxxx',
>     'schema-name' = 'dbo',
>     'table-name' = 'user_behavior'
>  ); {code}
> *3.* Run below sql through sql-client to start the job for capturing the cdc 
> data and do the aggregation and finally insert the result into target table.
> {code:java}
> insert into cumulative_cnt
> select
> date_str,
> max(time_str) as time_str,
> count(*) as cnt
> from
> (
>   select
>   DATE_FORMAT(create_date, 'yyyy-MM-dd') as date_str,
>   SUBSTR(DATE_FORMAT(create_date, 'HH:mm'),1,4) || '0' as time_str
>   from user_behavior
> ) group by date_str; 
> {code}
> *4.* Insert two records for testing.
> {code:java}
> INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 
> 01:01:00','click1');
> INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 
> 02:20:00','click1');{code}
> *5.* Checked the result in db( pls see the screen 1) and found that the 
> target table only have one record, but it is not the expectation cause the 
> two source records have different time, thus the compound key(date_str,  
> time_str) shoud be different( pls see the screen 2 )
>  
> There should be two records in the target table:
> 2023-06-01    01:00    1
> 2023-06-01    02:20    2
>  
> screen 1
> !image-2023-07-22-00-11-56-363.png|width=274,height=199!
>  
> screen 2
> !image-2023-07-21-23-57-22-186.png|width=572,height=120!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to