[
https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377949#comment-17377949
]
Timo Walther commented on FLINK-20374:
--------------------------------------
Thanks for working on this tricky issue [~lzljs3620320]. Could we start
documenting the changelog behavior of the planner in our docs as part of this
issue? Users should get more insights what the planner is doing. It is too much
of a black box right now. It is very difficult to interpret why and when +U,-U
or special operators are inserted into the pipeline.
> Wrong result when shuffling changelog stream on non-primary-key columns
> -----------------------------------------------------------------------
>
> Key: FLINK-20374
> URL: https://issues.apache.org/jira/browse/FLINK-20374
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Jark Wu
> Assignee: Jingsong Lee
> Priority: Critical
> Labels: auto-deprioritized-critical
> Fix For: 1.14.0
>
>
> This is reported from user-zh ML:
> http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html
> {code:sql}
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '1',
> 'database-name' = 'ai_audio_lyric_task',
> 'table-name' = 'test'
> )
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '1',
> 'database-name' = 'ai_audio_lyric_task',
> 'table-name' = 'status'
> );
> -- output
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'xxx',
> 'index' = 'xxx',
> 'username' = 'xxx',
> 'password' = 'xxx',
> 'sink.bulk-flush.backoff.max-retries' = '100000',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
> 'sink.bulk-flush.max-actions' = '5000',
> 'sink.bulk-flush.max-size' = '10mb',
> 'sink.bulk-flush.interval' = '1s'
> );
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
> {code}
> Data in mysql table:
> {code}
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
> status
> 0, status0
> 1, status1
> 2, status2
> .....
> {code}
> Operations:
> 1. start job with paralleslim=40, result in test_status sink is correct:
> {code}
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1
> 2, name2, 2020-07-06 00:00:00 , 1, status1
> {code}
> 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
> 3. Result is not correct because the {{id=2}} record is missing in the
> result.
> The reason is that it shuffles the changelog {{test}} on {{status}} column
> which is not the primary key. Therefore, the ordering can't be guaranteed,
> and the result is wrong.
> The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06
> 00:00:00 , 2]}} will possible be shuffled to different join task, so the
> order of joined results is not guaranteed when they arrive to the sink task.
> It is possbile {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives
> first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the
> {{id=2}} record is missing in Elasticsearch.
> It seems that we need a changelog ordering mechanism in the planner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)