[ 
https://issues.apache.org/jira/browse/FLINK-38183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18020011#comment-18020011
 ] 

LiuZeshan commented on FLINK-38183:
-----------------------------------

duplicated by #FLINK-37065

> Data loss when cdc reading mysql that has out of order GTID
> -----------------------------------------------------------
>
>                 Key: FLINK-38183
>                 URL: https://issues.apache.org/jira/browse/FLINK-38183
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 3.0.0
>         Environment: Fink-CDC: 3.5-SNAPSHOT
> Flink:1.20.1
>  
>            Reporter: LiuZeshan
>            Assignee: LiuZeshan
>            Priority: Critical
>              Labels: pull-request-available
>
> As the design of 
> [https://github.com/apache/flink-cdc/pull/2220|http://example.com/],CDC only 
> cares about the maximum GTID position and starts from it. For example, if 
> reading from gtid offset 1-7:9-10, it will automatically adjust to read from 
> 1-10, which causes an error in skipping gitd offset 8, thus losing data. In 
> particular, when gtid 8 is a large transaction, it will cause more serious 
> data loss. We have encountered this problem many times in the production 
> environment.
> MySQL 5.7+ supports parallel replication based on group commit 
> (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL 
> thread (Coordinator) of the database to multiple worker threads for 
> concurrent execution. Although the main database generates continuous GTIDs 
> in the order of submission (such as A: 1-100), the worker threads of the 
> slave database may complete transaction submission in disorder. When the CDC 
> reads the MySQL slave database, we may encounter the following gtid order. In 
> fact, we can also manually set the gtid to construct this scenario.
> {code:java}
> SET @@SESSION.GTID_NEXT='XXX:1';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:2';
> INSERT ...;
> ...
> SET @@SESSION.GTID_NEXT='XXX:7';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:9';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:10';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:8';
> BEGIN;
> INSERT ...;
> ... 
> INSERT ...; -- (the number 1 million DML, checkpoint at this position)
> ...
> INSERT ...; -- (the number 2 millions DML)
> COMMIT;
> SET @@SESSION.GTID_NEXT='XXX:11';
> INSERT ...; {code}
> There are 2 million transactions at GTID location 8. When 1 million data are 
> read, a checkpoint is triggered and completed. The recorded gtid offset is 
> 1-7:9-10, and the skip events are 1 million, as shown below.
> {code:java}
> offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190, 
> pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000, 
> server_id=123} {code}
> The job is restarted and recovered from this checkpoint. According to the 
> design of CDC, it is automatically adjusted to read from 1-10, and continues 
> to skip 1 million events, resulting in the loss of 1 million unread data of 
> gitd site 8 and the loss of data contained in 1 million events starting from 
> gtid offset 11.
>  
> h3. {*}Recurrence steps:{*}mysqlcdc -> print  SQL job
> 1. show master status to check current gtid offset
>  
> {code:java}
> mysql> show master status;
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | 
> Executed_Gtid_Set                                                             
>            |
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> | mysql-bin.000190 |    26484 |              |                  | 
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540 |
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> 1 row in set (0.00 sec) {code}
>  
>  2. Use the with options to specify the gtid offset to start with
>  
> {code:java}
> 'scan.startup.specific-offset.gtid-set' = 
> 'ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540',
> 'scan.startup.mode' = 'specific-offset', {code}
> 3. Manually set the gtid and update the data in MySQL
>  
>  
> {code:java}
> mysql> set gtid_next='fc992a75-c2a9-11ee-82e7-0242ac120004:182545';
> Query OK, 0 rows affected (0.01 sec)
> mysql> update full_types_2col set tiny_c=30 where id = 10;
> Query OK, 1 row affected (0.02 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> set gtid_next='automatic';
> Query OK, 0 rows affected (0.01 sec)
> mysql> begin;
> Query OK, 0 rows affected (0.00 sec)
> mysql> update full_types_2col set tiny_c=31 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> update full_types_2col set tiny_c=32 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> update full_types_2col set tiny_c=33 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> commit;
> Query OK, 0 rows affected (0.02 sec)
> mysql> show master status;
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | 
> Executed_Gtid_Set                                                             
>                   |
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> | mysql-bin.000190 |    27368 |              |                  | 
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545 |
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> 1 row in set (0.00 sec) {code}
> CDC job output:
> {code:java}
> -U[10, 29]
> +U[10, 30]
> -U[10, 30]
> +U[10, 31]
> -U[10, 31]
> +U[10, 32]
> -U[10, 32]
> +U[10, 33] {code}
>  4. Trigger checkpoint id=2
>  
> {code:java}
> Binlog offset for tables [test.full_types_2col] on checkpoint 2: 
> {transaction_id=null, ts_sec=1754194732, file=mysql-bin.000190, pos=26881, 
> kind=SPECIFIC, 
> gtids=ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545,
>  row=1, event=6, server_id=123} {code}
> 5. Restart the job and restore it from checkpoint id=2. It can be seen that 
> the current calculation logic of GtidUtils#fixRestoredGtidSet skips gtid 
> offset=182541, resulting in data loss when starting from the checkpoint.
> {code:java}
> GTID set from previous recorded offset: 
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545
> GTID set available on server: 
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545
> Final merged GTID set to use when connecting to MySQL: 
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545
>  {code}
> 6. Continue to update mysql data and observe job output, 3 data lost.
> {code:java}
> mysql> update full_types_2col set tiny_c=34 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> update full_types_2col set tiny_c=35 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> update full_types_2col set tiny_c=36 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1  Changed: 1  Warnings: 0
> mysql> update full_types_2col set tiny_c=37 where id = 10;
> Query OK, 1 row affected (0.02 sec)
> Rows matched: 1  Changed: 1  Warnings: 0 {code}
> {code:java}
> Skipping previously processed row event: 
> Event{header=EventHeaderV4{timestamp=1754194959000, 
> eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, 
> nextPosition=27655, flags=0}, data=UpdateRowsEventData{tableId=199, 
> includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
>     {before=[10, 33], after=[10, 34]}
> ]}}
>  Skipping previously processed row event: 
> Event{header=EventHeaderV4{timestamp=1754194963000, 
> eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, 
> nextPosition=27973, flags=0}, data=UpdateRowsEventData{tableId=199, 
> includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
>     {before=[10, 34], after=[10, 35]}
> ]}}
> -U[10, 36]
> +U[10, 37] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to