[ https://issues.apache.org/jira/browse/FLINK-38183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18020011#comment-18020011 ]
LiuZeshan commented on FLINK-38183: ----------------------------------- duplicated by #FLINK-37065 > Data loss when cdc reading mysql that has out of order GTID > ----------------------------------------------------------- > > Key: FLINK-38183 > URL: https://issues.apache.org/jira/browse/FLINK-38183 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Environment: Fink-CDC: 3.5-SNAPSHOT > Flink:1.20.1 > > Reporter: LiuZeshan > Assignee: LiuZeshan > Priority: Critical > Labels: pull-request-available > > As the design of > [https://github.com/apache/flink-cdc/pull/2220|http://example.com/],CDC only > cares about the maximum GTID position and starts from it. For example, if > reading from gtid offset 1-7:9-10, it will automatically adjust to read from > 1-10, which causes an error in skipping gitd offset 8, thus losing data. In > particular, when gtid 8 is a large transaction, it will cause more serious > data loss. We have encountered this problem many times in the production > environment. > MySQL 5.7+ supports parallel replication based on group commit > (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL > thread (Coordinator) of the database to multiple worker threads for > concurrent execution. Although the main database generates continuous GTIDs > in the order of submission (such as A: 1-100), the worker threads of the > slave database may complete transaction submission in disorder. When the CDC > reads the MySQL slave database, we may encounter the following gtid order. In > fact, we can also manually set the gtid to construct this scenario. > {code:java} > SET @@SESSION.GTID_NEXT='XXX:1'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:2'; > INSERT ...; > ... > SET @@SESSION.GTID_NEXT='XXX:7'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:9'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:10'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:8'; > BEGIN; > INSERT ...; > ... > INSERT ...; -- (the number 1 million DML, checkpoint at this position) > ... > INSERT ...; -- (the number 2 millions DML) > COMMIT; > SET @@SESSION.GTID_NEXT='XXX:11'; > INSERT ...; {code} > There are 2 million transactions at GTID location 8. When 1 million data are > read, a checkpoint is triggered and completed. The recorded gtid offset is > 1-7:9-10, and the skip events are 1 million, as shown below. > {code:java} > offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190, > pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000, > server_id=123} {code} > The job is restarted and recovered from this checkpoint. According to the > design of CDC, it is automatically adjusted to read from 1-10, and continues > to skip 1 million events, resulting in the loss of 1 million unread data of > gitd site 8 and the loss of data contained in 1 million events starting from > gtid offset 11. > > h3. {*}Recurrence steps:{*}mysqlcdc -> print SQL job > 1. show master status to check current gtid offset > > {code:java} > mysql> show master status; > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | > Executed_Gtid_Set > | > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > | mysql-bin.000190 | 26484 | | | > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540 | > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > 1 row in set (0.00 sec) {code} > > 2. Use the with options to specify the gtid offset to start with > > {code:java} > 'scan.startup.specific-offset.gtid-set' = > 'ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540', > 'scan.startup.mode' = 'specific-offset', {code} > 3. Manually set the gtid and update the data in MySQL > > > {code:java} > mysql> set gtid_next='fc992a75-c2a9-11ee-82e7-0242ac120004:182545'; > Query OK, 0 rows affected (0.01 sec) > mysql> update full_types_2col set tiny_c=30 where id = 10; > Query OK, 1 row affected (0.02 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> set gtid_next='automatic'; > Query OK, 0 rows affected (0.01 sec) > mysql> begin; > Query OK, 0 rows affected (0.00 sec) > mysql> update full_types_2col set tiny_c=31 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=32 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=33 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> commit; > Query OK, 0 rows affected (0.02 sec) > mysql> show master status; > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | > Executed_Gtid_Set > | > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > | mysql-bin.000190 | 27368 | | | > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545 | > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > 1 row in set (0.00 sec) {code} > CDC job output: > {code:java} > -U[10, 29] > +U[10, 30] > -U[10, 30] > +U[10, 31] > -U[10, 31] > +U[10, 32] > -U[10, 32] > +U[10, 33] {code} > 4. Trigger checkpoint id=2 > > {code:java} > Binlog offset for tables [test.full_types_2col] on checkpoint 2: > {transaction_id=null, ts_sec=1754194732, file=mysql-bin.000190, pos=26881, > kind=SPECIFIC, > gtids=ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545, > row=1, event=6, server_id=123} {code} > 5. Restart the job and restore it from checkpoint id=2. It can be seen that > the current calculation logic of GtidUtils#fixRestoredGtidSet skips gtid > offset=182541, resulting in data loss when starting from the checkpoint. > {code:java} > GTID set from previous recorded offset: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545 > GTID set available on server: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 > Final merged GTID set to use when connecting to MySQL: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 > {code} > 6. Continue to update mysql data and observe job output, 3 data lost. > {code:java} > mysql> update full_types_2col set tiny_c=34 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=35 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=36 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=37 where id = 10; > Query OK, 1 row affected (0.02 sec) > Rows matched: 1 Changed: 1 Warnings: 0 {code} > {code:java} > Skipping previously processed row event: > Event{header=EventHeaderV4{timestamp=1754194959000, > eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, > nextPosition=27655, flags=0}, data=UpdateRowsEventData{tableId=199, > includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ > {before=[10, 33], after=[10, 34]} > ]}} > Skipping previously processed row event: > Event{header=EventHeaderV4{timestamp=1754194963000, > eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, > nextPosition=27973, flags=0}, data=UpdateRowsEventData{tableId=199, > includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ > {before=[10, 34], after=[10, 35]} > ]}} > -U[10, 36] > +U[10, 37] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)