[
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321850#comment-17321850
]
Jark Wu commented on FLINK-22281:
---------------------------------
If you don't have the full historical data, only the incremental binlog data,
then aggregating on it will get wrong results, because the input data is
incorrect (update an non-existing data).
You can try to turn on {{table.exec.source.cdc-events-duplicate=true}} to
convert the in-complete changelog into a normalized changelog, e.g. a
non-existing update will be converted into an insert.
See
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/canal.html#duplicate-change-events
> flink sql consumer kakfa canal-json message then sum(amount)
> --------------------------------------------------------------
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.12.0
> Environment: flink 1.12 local
> Reporter: xx chai
> Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message the sql is
> CREATE TABLE kafka_mall_order_info (
> id int,
> amount double,
> PRIMARY KEY ( id) NOT ENFORCED
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'topic_yx-dc-3-102_3306',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'kafka_to_hive',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'canal-json');
> create table t2 (amount double) with ('connector' = 'print');
>
>
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think
> the result in image
--
This message was sent by Atlassian Jira
(v8.3.4#803005)