[ 
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601151#comment-17601151
 ] 

lincoln lee commented on FLINK-29216:
-------------------------------------

[~SpongebobZ] for above query, the sink table with a primary key (STUNAME) 
which is not the same as the update stream produced from the source (SOURCE 
table has no primary key definition and is a change log source that has 
changes, not insert-only messages),  in such case, planner will add a 
SinkMaterializer(pk='STUNAME') operator to generate the key required by sink, 
since there's no source key, it will never produce update messages, so only 
Insert and Delete appears.

If your SOURCE table actually has a same primary key (STUNAME), then the 
SinkMaterializer operator will not be added, and changes from source table will 
keep its original rowkind to the sink.

wish this can helps.

> rows belong to update rowkind are transfered into delete and insert
> -------------------------------------------------------------------
>
>                 Key: FLINK-29216
>                 URL: https://issues.apache.org/jira/browse/FLINK-29216
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.3
>            Reporter: Spongebob
>            Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were 
> transfered into DELETE and INSERT rowkind, despite the primay key values were 
> not changed at all. If this is the expected logic then how could I change 
> this behavior ?
>  
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH 
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY 
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE 
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to