[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321850#comment-17321850
 ] 

Jark Wu commented on FLINK-22281:
---------------------------------

If you don't have the full historical data, only the incremental binlog data, 
then aggregating on it will get wrong results, because the input data is 
incorrect (update an non-existing data). 

You can try to turn on {{table.exec.source.cdc-events-duplicate=true}} to 
convert the in-complete changelog into a normalized changelog, e.g. a 
non-existing update will be converted into an insert. 

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/canal.html#duplicate-change-events

> flink sql consumer kakfa canal-json message then sum(amount)  
> --------------------------------------------------------------
>
>                 Key: FLINK-22281
>                 URL: https://issues.apache.org/jira/browse/FLINK-22281
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.0
>         Environment: flink 1.12 local
>            Reporter: xx chai
>            Priority: Major
>         Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>    PRIMARY KEY (  id) NOT ENFORCED
>    ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to