[ 
https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356124#comment-17356124
 ] 

徐州州 commented on FLINK-20374:
-----------------------------

Very sure, Do a canal-json-based association, Probably my job has not 
configured any window and state failure parameters, The first day of submission 
is normal, Because the first submission task consumed only the day 's data did 
not record yesterday' s state, If my program runs two bars continuously, For 
example, the first day of the order status is the order status, However, the 
program passes continuously for the second day, The next day I query 
upsert-kafka after join data discovery associated data is-U after + U, But the 
next day of the canal-json original layer the second day of the data for three 
+ U, But somehow after + U -U.

> Wrong result when shuffling changelog stream on non-primary-key columns
> -----------------------------------------------------------------------
>
>                 Key: FLINK-20374
>                 URL: https://issues.apache.org/jira/browse/FLINK-20374
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Jark Wu
>            Assignee: Jingsong Lee
>            Priority: Critical
>              Labels: stale-critical
>             Fix For: 1.14.0
>
>
> This is reported from user-zh ML: 
> http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html
> {code:sql}
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (  
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
> -- output
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '100000',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
> {code}
> Data in mysql table:
> {code}
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
> status
> 0, status0
> 1, status1
> 2, status2
> .....
> {code}
> Operations: 
> 1. start job with paralleslim=40, result in test_status sink is correct: 
> {code}
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1
>     2, name2, 2020-07-06 00:00:00 , 1, status1
> {code}
> 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
> 3. Result is not correct because the {{id=2}} record is missing in the 
> result. 
> The reason is that it shuffles the changelog {{test}} on {{status}} column 
> which is not the primary key. Therefore, the ordering can't be guaranteed, 
> and the result is wrong. 
> The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06 
> 00:00:00 , 2]}} will possible be shuffled to different join task, so the 
> order of joined results  is not guaranteed when they arrive to the sink task. 
> It is possbile  {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives 
> first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the 
> {{id=2}} record is missing in Elasticsearch. 
> It seems that we need a changelog ordering mechanism in the planner. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to