MengHui Yu created FLINK-39179:
----------------------------------
Summary: Add REPLACE INTO filtering in handleQueryEvent to fix
pt-table-checksum DML parsing failure
Key: FLINK-39179
URL: https://issues.apache.org/jira/browse/FLINK-39179
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Reporter: MengHui Yu
h3. Problem
When pt-table-checksum runs against a MySQL instance monitored by Flink CDC, it
sets session-level _binlog_format = STATEMENT_ and executes _REPLACE INTO_
statements. These DML statements appear as _QueryEvent_ in the binlog instead
of row-based events.
The current _handleQueryEvent_ in _MySqlStreamingChangeEventSource_ only
filters {_}INSERT{_}, {_}UPDATE{_}, and _DELETE_ statements using
{_}equals{_}() matching. _REPLACE_ _INTO_ is not filtered, causing it to be
incorrectly passed to DDL parsing and leading to connector failure.
This issue was originally reported and fixed in upstream Debezium as
[DBZ-9428|https://issues.redhat.com/browse/DBZ-9428] via [PR
#7004|https://github.com/debezium/debezium/pull/7004]. Since Flink CDC
maintains a forked copy of {_}MySqlStreamingChangeEventSource{_}, the fix needs
to be synced.
h3. Root Cause
In {_}handleQueryEvent{_}(), the DML filtering logic:
{code:java}
String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
if (upperCasedStatementBegin.equals("INSERT ")
|| upperCasedStatementBegin.equals("UPDATE ")
|| upperCasedStatementBegin.equals("DELETE ")) {
// filtered
} {code}
has two problems:
* _REPLACE_ prefix is missing — _REPLACE INTO_ statements pass through to DDL
parsing
{_}* equals{_}() is used instead of {_}startsWith{_}(), which is fragile when
the prefix length changes
h3. Fix
Sync the changes from Debezium PR #7004:
* Add _REPLACE_ to the DML prefix list
* Extract {_}isDmlStatement{_}() method using {_}startsWith{_}() for robust
matching
* Increase _Strings.getBegin_ length from 7 to 8 (to accommodate "{_}REPLACE{_}
" which is 8 chars)
* Use parameterized logging instead of string concatenation
h3. How to Reproduce
# Start Flink CDC MySQL connector monitoring a table
# Run pt-table-checksum (or simulate by executing {_}SET SESSION binlog_format
= 'STATEMENT{_}' followed by REPLACE INTO ...)
# Connector fails when attempting to parse _REPLACE INTO_ as DDL
h3. References
* Upstream fix: [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428]
* Upstream PR:
[debezium/debezium#7004|https://github.com/debezium/debezium/pull/7004]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)