[ 
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-29216:
------------------------------
    Description: 
When I declared primary key on the sink table, all UPDATE rows were transfered 
into DELETE and INSERT rowkind, despite the primay key values were not changed 
at all. If this is the expected logic then how could I change this behavior ?

 

following is the test code:
{code:java}
CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')

CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')

insert into SINK select * from SOURCE 



when I produced these message to kafka:
{"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
{"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}


flink output these rows:
+I[BOB, MATH, 10]
-D[BOB, MATH, 10]
+I[BOB, MATH, 16]{code}

  was:When I declared primary key on the sink table, all UPDATE rows were 
transfered into DELETE and INSERT rowkind, despite the primay key values were 
not changed at all. If this is the expected logic then how could I change this 
behavior ?


> rows belong to update rowkind are transfered into delete and insert
> -------------------------------------------------------------------
>
>                 Key: FLINK-29216
>                 URL: https://issues.apache.org/jira/browse/FLINK-29216
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.3
>            Reporter: Spongebob
>            Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were 
> transfered into DELETE and INSERT rowkind, despite the primay key values were 
> not changed at all. If this is the expected logic then how could I change 
> this behavior ?
>  
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE 
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to