[
https://issues.apache.org/jira/browse/FLINK-38531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yanquan Lv resolved FLINK-38531.
--------------------------------
Fix Version/s: cdc-3.6.0
Assignee: sherhomhuang
Resolution: Fixed
Fixed in master via c1a7d0b6f194ad2511712c1efb0a17053080f3d8.
> MySQL CDC BinlogOffset compareTo method cannot work correctly when gtidSet is
> same, which may cause loss data when restored from gtid .
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38531
> URL: https://issues.apache.org/jira/browse/FLINK-38531
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.1, cdc-3.3.0, cdc-3.2.1, cdc-3.4.0, cdc-3.5.0
> Reporter: sherhomhuang
> Assignee: sherhomhuang
> Priority: Critical
> Labels: pull-request-available
> Fix For: cdc-3.6.0
>
>
> When gtidSet is same, the method *BinlogOffset#compareTo* is end to return at
> comparison of restartSkipEvents. But in a correct comparision, it should
> continue to compare restartSkipRows.
> Such as
> A=\{transaction_id=d95558e1-886a-11ef-87c8-08002784c43f:170633,
> ts_sec=1760843591, file=binlog.000181, pos=34580, kind=SPECIFIC,
> gtids=d95558e1-886a-11ef-87c8-08002784c43f:1-170632, row=5, event=2,
> server_id=1, transaction_data_collection_order_sampleDB.sample_data=5}
> B=\{transaction_id=d95558e1-886a-11ef-87c8-08002784c43f:170633,
> ts_sec=1760843591, file=binlog.000181, pos=34580, kind=SPECIFIC,
> gtids=d95558e1-886a-11ef-87c8-08002784c43f:1-170632, row=4, event=2,
> server_id=1, transaction_data_collection_order_sampleDB.sample_data=4}
> The gtid set and event of A and B is same , but the row of A is 5 and the row
> of B is 4.Therefore , the result of A.isAfter(B) should be true .
> But the method {*}{{*}}{*}BinlogOffset#compareTo{*} returns false. Because it
> doesnt't compare row attribute.
> The code in method
> *org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo*
> when offset has gtidSet:
> {code:java}
> if (StringUtils.isNotEmpty(gtidSetStr)) {
> GtidSet gtidSet = new GtidSet(gtidSetStr);
> GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
> if (gtidSet.equals(targetGtidSet)) {
> long restartSkipEvents = this.getRestartSkipEvents();
> long targetRestartSkipEvents =
> that.getRestartSkipEvents();
> // It end in this lines. But it should continue
> to compare the restartSkipRows
> return Long.compare(restartSkipEvents,
> targetRestartSkipEvents);
> }
> return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
> } {code}
>
> Look at the following code of
> *org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset#compareTo*
> when use fileName and offset, it is correct to compare the restartSkipRows:
> {code:java}
> // The positions are the same, so compare the completed events in the
> transaction ...
> if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
> return Long.compare(this.getRestartSkipEvents(),
> that.getRestartSkipEvents());
> }
> // The completed events are the same, so compare the row number ...
> return Long.compare(this.getRestartSkipRows(),
> that.getRestartSkipRows());
> {code}
> This bug may cause loss of data when restore from gtidset, when then
> checkpoint is mark the position in the middle of the data in one event.
> For example:
> When execute the sql below:
> {code:sql}
> Begin;
> UPDATE customers SET address = 'Hangzhou' where id = 101 OR id = 102;
> UPDATE customers SET address = 'Pittsburgh' where id = 103 OR id = 109;
> commit;{code}
> The binlog will be :
> {code:java}
> Event 0: QUERY,BEGIN
> Event 1: TABLE_MAP
> Event 2: Update id = 101 and id = 102
> ROW 1 : Update id=101
> ROW 2 : Update id=102
> Event 3: TABLE_MAP
> Event 4: Update id = 103 and id = 109
> ROW 1 : Update id=103
> ROW 2 : Update id=109
> {code}
> When a checkpoint is triggered after id=103 is emitted and before id=109, the
> position restored from checkpoint will be eventToSkip=4 and rowToSkip=1.
> When the job restored from cp, it should continue to emit id=109. But in
> *BinlogSplitReader#hasEnterPureBinlogPhase* the id=109 will be filtered out
> when compared with the restored position, because the eventNo of id=109 is
> 4, and the eventNo of restored position is also 4, but when use gtid set , it
> will not compare row no, the result of compare is 0 , caused result of method
> *BinlogOffset#isAfter* is false.
> {code:java}
> private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset
> position) {
> // the existed tables those have finished snapshot reading
> if (maxSplitHighWatermarkMap.containsKey(tableId)
> && position.isAfter(maxSplitHighWatermarkMap.get(tableId))) {
> // Id=109 is filtered out hear.
> pureBinlogPhaseTables.add(tableId);
> return true;
> } // Use still need to capture new sharding table if user
> disable scan new added table,
> // The history records for all new added tables(including sharding
> table and normal table)
> // will be capture after restore from a savepoint if user enable scan
> new added table
> if
> (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
> // the new added sharding table without history records
> return !maxSplitHighWatermarkMap.containsKey(tableId)
> && capturedTableFilter.test(tableId);
> }
> return false;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)