[
https://issues.apache.org/jira/browse/FLINK-19452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17204488#comment-17204488
]
Zhengchao Shi commented on FLINK-19452:
---------------------------------------
[~jark] Because I started the job from the `latest-offset`, that is to say,
there is no CDC state in flink(see in
'org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction'), when
the update event is received, the count statistics will be balanced between 1
and 0, because there are only -U and +U messages。like this:
first updates: update orders set product_id = 101 where order_number = 10001
and product_id = 100;
-U(10001,1) -> ignore ,because no state, and it is a retract message, so just
return in GroupAggFunction#processElement
+U(10001, 2 -> +I(10001, 1)
second updates: update orders set product_id = 102 where order_number = 10001
and product_id = 105;
-U(10001,1) -> -D(10001,1)
+U(10001, 2 -> +I(10001, 1)
> statistics of group by CDC data is always 1
> -------------------------------------------
>
> Key: FLINK-19452
> URL: https://issues.apache.org/jira/browse/FLINK-19452
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.11.1
> Reporter: Zhengchao Shi
> Priority: Major
> Fix For: 1.12.0
>
>
> When using CDC to do count statistics, if only updates are made to the source
> table(mysql table), then the value of count is always 1.
> {code:sql}
> CREATE TABLE orders (
> order_number int,
> product_id int
> ) with (
> 'connector' = 'kafka-0.11',
> 'topic' = 'Topic',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'GroupId',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'canal-json'
> );
> CREATE TABLE order_test (
> order_number int,
> order_cnt bigint
> ) WITH (
> 'connector' = 'print'
> );
> INSERT INTO order_test
> SELECT order_number, count(1) FROM orders GROUP BY order_number;
> {code}
> 3 records in “orders” :
> ||order_number||product_id||
> |10001|1|
> |10001|2|
> |10001|3|
> now update orders table:
> {code:sql}
> update orders set product_id = 5 where order_number = 10001;
> {code}
> the output of is :
> -D(10001,1)
> +I(10001,1)
> -D(10001,1)
> +I(10001,1)
> -D(10001,1)
> +I(10001,1)
> i think, the final result is +I(10001, 3)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)