[
https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537435#comment-17537435
]
lvycc commented on FLINK-20374:
-------------------------------
Hi,I have a new problem,here is my sql:
CREATE TABLE t_order (
order_id INT,
order_name STRING,
product_id INT,
user_id INT,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'ycc123',
'database-name' = 'wby_test',
'table-name' = 't_order'
);
CREATE TABLE t_logistics (
logistics_id INT,
logistics_target STRING,
logistics_source STRING,
logistics_time TIMESTAMP(0),
order_id INT,
PRIMARY KEY(logistics_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'ycc123',
'database-name' = 'wby_test',
'table-name' = 't_logistics'
);
CREATE TABLE t_join_sink (
order_id INT,
order_name STRING,
logistics_id INT,
logistics_target STRING,
logistics_source STRING,
logistics_time timestamp,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' =
'jdbc:mysql://localhost:3306/wby_test?characterEncoding=utf8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai',
'table-name' = 't_join_sink',
'username' = 'root',
'password' = 'ycc123'
);
INSERT INTO t_join_sink
SELECT ord.order_id,
ord.order_name,
logistics.logistics_id,
logistics.logistics_target,
logistics.logistics_source,
now()
FROM t_order AS ord
LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id;
The new data cannot be deleted because I used the now() function in the query .
I found it was due to the SinkUpsertMaterializer ,What should I do to properly
process the data
> Wrong result when shuffling changelog stream on non-primary-key columns
> -----------------------------------------------------------------------
>
> Key: FLINK-20374
> URL: https://issues.apache.org/jira/browse/FLINK-20374
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Jark Wu
> Assignee: Jingsong Lee
> Priority: Critical
> Labels: auto-deprioritized-critical
> Fix For: 1.14.0, 1.13.3
>
>
> This is reported from user-zh ML:
> http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html
> {code:sql}
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '1',
> 'database-name' = 'ai_audio_lyric_task',
> 'table-name' = 'test'
> )
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '1',
> 'database-name' = 'ai_audio_lyric_task',
> 'table-name' = 'status'
> );
> -- output
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'xxx',
> 'index' = 'xxx',
> 'username' = 'xxx',
> 'password' = 'xxx',
> 'sink.bulk-flush.backoff.max-retries' = '100000',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
> 'sink.bulk-flush.max-actions' = '5000',
> 'sink.bulk-flush.max-size' = '10mb',
> 'sink.bulk-flush.interval' = '1s'
> );
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
> {code}
> Data in mysql table:
> {code}
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
> status
> 0, status0
> 1, status1
> 2, status2
> .....
> {code}
> Operations:
> 1. start job with paralleslim=40, result in test_status sink is correct:
> {code}
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1
> 2, name2, 2020-07-06 00:00:00 , 1, status1
> {code}
> 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
> 3. Result is not correct because the {{id=2}} record is missing in the
> result.
> The reason is that it shuffles the changelog {{test}} on {{status}} column
> which is not the primary key. Therefore, the ordering can't be guaranteed,
> and the result is wrong.
> The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06
> 00:00:00 , 2]}} will possible be shuffled to different join task, so the
> order of joined results is not guaranteed when they arrive to the sink task.
> It is possbile {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives
> first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the
> {{id=2}} record is missing in Elasticsearch.
> It seems that we need a changelog ordering mechanism in the planner.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)