[ 
https://issues.apache.org/jira/browse/FLINK-39179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39179:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add REPLACE INTO filtering in handleQueryEvent to fix pt-table-checksum DML 
> parsing failure
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39179
>                 URL: https://issues.apache.org/jira/browse/FLINK-39179
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>            Reporter: MengHui Yu
>            Priority: Minor
>              Labels: pull-request-available
>
> h3. Problem
> When pt-table-checksum runs against a MySQL instance monitored by Flink CDC, 
> it sets session-level _binlog_format = STATEMENT_ and executes _REPLACE INTO_ 
> statements. These DML statements appear as _QueryEvent_ in the binlog instead 
> of row-based events.
> The current _handleQueryEvent_ in _MySqlStreamingChangeEventSource_ only 
> filters {_}INSERT{_}, {_}UPDATE{_}, and _DELETE_ statements using 
> {_}equals{_}() matching. _REPLACE_ _INTO_ is not filtered, causing it to be 
> incorrectly passed to DDL parsing and leading to connector failure.
> This issue was originally reported and fixed in upstream Debezium as 
> [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428] via [PR 
> #7004|https://github.com/debezium/debezium/pull/7004]. Since Flink CDC 
> maintains a forked copy of {_}MySqlStreamingChangeEventSource{_}, the fix 
> needs to be synced.
> h3. Root Cause
> In {_}handleQueryEvent{_}(), the DML filtering logic:
>  
> {code:java}
> String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
> if (upperCasedStatementBegin.equals("INSERT ")
>         || upperCasedStatementBegin.equals("UPDATE ")
>         || upperCasedStatementBegin.equals("DELETE ")) {
>     // filtered
> } {code}
>  
> has two problems:
> * _REPLACE_ prefix is missing — _REPLACE INTO_ statements pass through to DDL 
> parsing
> {_}* equals{_}() is used instead of {_}startsWith{_}(), which is fragile when 
> the prefix length changes
> h3. Fix
> Sync the changes from Debezium PR #7004:
> * Add _REPLACE_ to the DML prefix list
> * Extract {_}isDmlStatement{_}() method using {_}startsWith{_}() for robust 
> matching
> * Increase _Strings.getBegin_ length from 7 to 8 (to accommodate 
> "{_}REPLACE{_} " which is 8 chars)
> * Use parameterized logging instead of string concatenation
> h3. How to Reproduce
> # Start Flink CDC MySQL connector monitoring a table
> # Run pt-table-checksum (or simulate by executing {_}SET SESSION 
> binlog_format = 'STATEMENT{_}' followed by REPLACE INTO ...)
> # Connector fails when attempting to parse _REPLACE INTO_ as DDL
> h3. References
> * Upstream fix: [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428]
> * Upstream PR: 
> [debezium/debezium#7004|https://github.com/debezium/debezium/pull/7004]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to