[
https://issues.apache.org/jira/browse/FLINK-29216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Spongebob updated FLINK-29216:
------------------------------
Description:
When I declared primary key on the sink table, all UPDATE rows were transfered
into DELETE and INSERT rowkind, despite the primay key values were not changed
at all. If this is the expected logic then how could I change this behavior ?
following is the test code:
{code:java}
CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH
('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY
KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
insert into SINK select * from SOURCE
when I produced these message to kafka:
{"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
{"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
flink output these rows:
+I[BOB, MATH, 10]
-D[BOB, MATH, 10]
+I[BOB, MATH, 16]{code}
was:When I declared primary key on the sink table, all UPDATE rows were
transfered into DELETE and INSERT rowkind, despite the primay key values were
not changed at all. If this is the expected logic then how could I change this
behavior ?
> rows belong to update rowkind are transfered into delete and insert
> -------------------------------------------------------------------
>
> Key: FLINK-29216
> URL: https://issues.apache.org/jira/browse/FLINK-29216
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.14.3
> Reporter: Spongebob
> Priority: Major
>
> When I declared primary key on the sink table, all UPDATE rows were
> transfered into DELETE and INSERT rowkind, despite the primay key values were
> not changed at all. If this is the expected logic then how could I change
> this behavior ?
>
> following is the test code:
> {code:java}
> CREATE TABLE SOURCE(STUNAME STRING, SUBJECT STRING, SCORE INT) WITH
> ('connector'='kafka','properties.bootstrap.servers'='node01:9092','topic'='jolinTest','properties.group.id'='test','scan.startup.mode'='latest-offset','format'='debezium-json')
> CREATE TABLE SINK(STUNAME STRING, SUBJECT STRING, SCORE INT, PRIMARY
> KEY(STUNAME) NOT ENFORCED) WITH ('connector'='print')
> insert into SINK select * from SOURCE
> when I produced these message to kafka:
> {"op":"c","after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10}}
> {"op":"u","before":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":10},"after":{"STUNAME":"BOB","SUBJECT":"MATH","SCORE":16}}
> flink output these rows:
> +I[BOB, MATH, 10]
> -D[BOB, MATH, 10]
> +I[BOB, MATH, 16]{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)