[ 
https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20374:
----------------------------
    Description: 
This is reported from user-zh ML: 
http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html

{code:sql}
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)

CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

-- output
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '100000',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);

INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
{code}

Data in mysql table:

{code}
test:
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.....

status
0, status0
1, status1
2, status2
.....
{code}

Operations: 
1. start job with paralleslim=40, result in test_status sink is correct: 

{code}
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1
    2, name2, 2020-07-06 00:00:00 , 1, status1
{code}

2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
3. Result is not correct because the {{id=2}} record is missing in the result. 



The reason is that it shuffles the changelog {{test}} on {{status}} column 
which is not the primary key. Therefore, the ordering can't be guaranteed, and 
the result is wrong. 
The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06 
00:00:00 , 2]}} will possible be shuffled to different join task, so the order 
of joined results  is not guaranteed when they arrive to the sink task. It is 
possbile  {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives first, and 
then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the {{id=2}} record 
is missing in Elasticsearch. 

It seems that we need a changelog ordering mechanism in the planner. 

  was:
This is reported from user-zh ML: 
http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html

{code:sql}
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)

CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

-- output
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '100000',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);

INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
{code}

Data in mysql table:

{code}
test:
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.....

status
0, status0
1, status1
2, status2
.....
{code}

Operations: 
1. start job with paralleslim=40, result in test_status sink is correct: 

{code}
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1
    2, name2, 2020-07-06 00:00:00 , 1, status1
{code}

2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
3. Result is not correct because the {{id=2}} record is missing in the result. 



The reason is that it shuffles the changelog {{test}} on {{status}} column 
which is not the primary key. Therefore, the ordering can't be guaranteed, and 
the result is wrong. 

It seems that we need a changelog ordering mechanism in the planner. 


> Wrong result when shuffling changelog stream on non-primary-key columns
> -----------------------------------------------------------------------
>
>                 Key: FLINK-20374
>                 URL: https://issues.apache.org/jira/browse/FLINK-20374
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Priority: Major
>
> This is reported from user-zh ML: 
> http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html
> {code:sql}
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (  
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
> -- output
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '100000',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
> {code}
> Data in mysql table:
> {code}
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
> status
> 0, status0
> 1, status1
> 2, status2
> .....
> {code}
> Operations: 
> 1. start job with paralleslim=40, result in test_status sink is correct: 
> {code}
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1
>     2, name2, 2020-07-06 00:00:00 , 1, status1
> {code}
> 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
> 3. Result is not correct because the {{id=2}} record is missing in the 
> result. 
> The reason is that it shuffles the changelog {{test}} on {{status}} column 
> which is not the primary key. Therefore, the ordering can't be guaranteed, 
> and the result is wrong. 
> The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06 
> 00:00:00 , 2]}} will possible be shuffled to different join task, so the 
> order of joined results  is not guaranteed when they arrive to the sink task. 
> It is possbile  {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives 
> first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the 
> {{id=2}} record is missing in Elasticsearch. 
> It seems that we need a changelog ordering mechanism in the planner. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to