[ 
https://issues.apache.org/jira/browse/FLINK-38531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanquan Lv resolved FLINK-38531.
--------------------------------
    Fix Version/s: cdc-3.6.0
         Assignee: sherhomhuang
       Resolution: Fixed

Fixed in master via c1a7d0b6f194ad2511712c1efb0a17053080f3d8.

> MySQL CDC BinlogOffset compareTo method cannot work correctly when gtidSet is 
> same, which may cause loss data when restored from gtid .
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38531
>                 URL: https://issues.apache.org/jira/browse/FLINK-38531
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1, cdc-3.3.0, cdc-3.2.1, cdc-3.4.0, cdc-3.5.0
>            Reporter: sherhomhuang
>            Assignee: sherhomhuang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: cdc-3.6.0
>
>
> When gtidSet is same, the method *BinlogOffset#compareTo* is end to return at 
> comparison of restartSkipEvents. But in a correct comparision, it should 
> continue to compare restartSkipRows.
> Such as 
> A=\{transaction_id=d95558e1-886a-11ef-87c8-08002784c43f:170633, 
> ts_sec=1760843591, file=binlog.000181, pos=34580, kind=SPECIFIC, 
> gtids=d95558e1-886a-11ef-87c8-08002784c43f:1-170632, row=5, event=2, 
> server_id=1, transaction_data_collection_order_sampleDB.sample_data=5}
> B=\{transaction_id=d95558e1-886a-11ef-87c8-08002784c43f:170633, 
> ts_sec=1760843591, file=binlog.000181, pos=34580, kind=SPECIFIC, 
> gtids=d95558e1-886a-11ef-87c8-08002784c43f:1-170632, row=4, event=2, 
> server_id=1, transaction_data_collection_order_sampleDB.sample_data=4}
> The gtid set and event of A and B is same , but the row of A is 5 and the row 
> of B is 4.Therefore , the result of A.isAfter(B) should be true .
> But the method {*}{{*}}{*}BinlogOffset#compareTo{*} returns false. Because it 
> doesnt't compare row attribute.
> The code in method 
> *org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo* 
> when offset has gtidSet:
> {code:java}
>                if (StringUtils.isNotEmpty(gtidSetStr)) {
>                 GtidSet gtidSet = new GtidSet(gtidSetStr);
>                 GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
>                 if (gtidSet.equals(targetGtidSet)) {
>                     long restartSkipEvents = this.getRestartSkipEvents();
>                     long targetRestartSkipEvents = 
> that.getRestartSkipEvents();
>                              // It end in this lines. But it should continue 
> to compare the restartSkipRows
>                     return Long.compare(restartSkipEvents, 
> targetRestartSkipEvents);
>                 }
>                 return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
>                 } {code}
>  
> Look at the following code of 
> *org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo* 
> when use fileName and offset, it is correct to compare the restartSkipRows:
> {code:java}
>     // The positions are the same, so compare the completed events in the 
> transaction ...
>         if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
>             return Long.compare(this.getRestartSkipEvents(), 
> that.getRestartSkipEvents());
>         }
>         // The completed events are the same, so compare the row number ...
>         return Long.compare(this.getRestartSkipRows(), 
> that.getRestartSkipRows());
> {code}
> This bug may cause loss of data when restore from gtidset, when then 
> checkpoint is mark the position in the middle of the data in one event.
> For example:
> When execute the sql below:
> {code:sql}
> Begin;
> UPDATE customers SET address = 'Hangzhou' where id = 101 OR id = 102;
> UPDATE customers SET address = 'Pittsburgh' where id = 103 OR id = 109;
> commit;{code}
> The binlog will be :
> {code:java}
> Event 0: QUERY,BEGIN
> Event 1: TABLE_MAP
> Event 2: Update id = 101 and id = 102
>        ROW 1 : Update id=101
>        ROW 2 : Update id=102
> Event 3: TABLE_MAP
> Event 4: Update id = 103 and id = 109
>        ROW 1 : Update id=103
>        ROW 2 : Update id=109
> {code}
> When a checkpoint is triggered after id=103 is emitted and before id=109, the 
> position restored from checkpoint will be eventToSkip=4 and rowToSkip=1. 
> When the job restored from cp, it should continue to emit id=109. But in 
> *BinlogSplitReader#hasEnterPureBinlogPhase* the id=109 will be filtered out 
> when compared   with the restored position, because the eventNo of id=109 is 
> 4, and the eventNo of restored position is also 4, but when use gtid set , it 
> will not compare row no, the result of compare is 0 , caused result of method 
> *BinlogOffset#isAfter* is false.
> {code:java}
>     private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset 
> position) {
>         // the existed tables those have finished snapshot reading
>         if (maxSplitHighWatermarkMap.containsKey(tableId)
>                 && position.isAfter(maxSplitHighWatermarkMap.get(tableId))) { 
> // Id=109 is filtered out hear.
>             pureBinlogPhaseTables.add(tableId);
>             return true;
>         }        // Use still need to capture new sharding table if user 
> disable scan new added table,
>         // The history records for all new added tables(including sharding 
> table and normal table)
>         // will be capture after restore from a savepoint if user enable scan 
> new added table
>         if 
> (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
>             // the new added sharding table without history records
>             return !maxSplitHighWatermarkMap.containsKey(tableId)
>                     && capturedTableFilter.test(tableId);
>         }
>         return false;
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to