MengHui Yu created FLINK-39179:
----------------------------------

             Summary: Add REPLACE INTO filtering in handleQueryEvent to fix 
pt-table-checksum DML parsing failure
                 Key: FLINK-39179
                 URL: https://issues.apache.org/jira/browse/FLINK-39179
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
            Reporter: MengHui Yu


h3. Problem

When pt-table-checksum runs against a MySQL instance monitored by Flink CDC, it 
sets session-level _binlog_format = STATEMENT_ and executes _REPLACE INTO_ 
statements. These DML statements appear as _QueryEvent_ in the binlog instead 
of row-based events.

The current _handleQueryEvent_ in _MySqlStreamingChangeEventSource_ only 
filters {_}INSERT{_}, {_}UPDATE{_}, and _DELETE_ statements using 
{_}equals{_}() matching. _REPLACE_ _INTO_ is not filtered, causing it to be 
incorrectly passed to DDL parsing and leading to connector failure.

This issue was originally reported and fixed in upstream Debezium as 
[DBZ-9428|https://issues.redhat.com/browse/DBZ-9428] via [PR 
#7004|https://github.com/debezium/debezium/pull/7004]. Since Flink CDC 
maintains a forked copy of {_}MySqlStreamingChangeEventSource{_}, the fix needs 
to be synced.
h3. Root Cause

In {_}handleQueryEvent{_}(), the DML filtering logic:

 
{code:java}
String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
if (upperCasedStatementBegin.equals("INSERT ")
        || upperCasedStatementBegin.equals("UPDATE ")
        || upperCasedStatementBegin.equals("DELETE ")) {
    // filtered
} {code}
 

has two problems:
* _REPLACE_ prefix is missing — _REPLACE INTO_ statements pass through to DDL 
parsing
{_}* equals{_}() is used instead of {_}startsWith{_}(), which is fragile when 
the prefix length changes
h3. Fix

Sync the changes from Debezium PR #7004:
* Add _REPLACE_ to the DML prefix list
* Extract {_}isDmlStatement{_}() method using {_}startsWith{_}() for robust 
matching
* Increase _Strings.getBegin_ length from 7 to 8 (to accommodate "{_}REPLACE{_} 
" which is 8 chars)
* Use parameterized logging instead of string concatenation
h3. How to Reproduce

# Start Flink CDC MySQL connector monitoring a table
# Run pt-table-checksum (or simulate by executing {_}SET SESSION binlog_format 
= 'STATEMENT{_}' followed by REPLACE INTO ...)
# Connector fails when attempting to parse _REPLACE INTO_ as DDL
h3. References

* Upstream fix: [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428]
* Upstream PR: 
[debezium/debezium#7004|https://github.com/debezium/debezium/pull/7004]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to