[
https://issues.apache.org/jira/browse/FLINK-39179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39179:
-----------------------------------
Labels: pull-request-available (was: )
> Add REPLACE INTO filtering in handleQueryEvent to fix pt-table-checksum DML
> parsing failure
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-39179
> URL: https://issues.apache.org/jira/browse/FLINK-39179
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Reporter: MengHui Yu
> Priority: Minor
> Labels: pull-request-available
>
> h3. Problem
> When pt-table-checksum runs against a MySQL instance monitored by Flink CDC,
> it sets session-level _binlog_format = STATEMENT_ and executes _REPLACE INTO_
> statements. These DML statements appear as _QueryEvent_ in the binlog instead
> of row-based events.
> The current _handleQueryEvent_ in _MySqlStreamingChangeEventSource_ only
> filters {_}INSERT{_}, {_}UPDATE{_}, and _DELETE_ statements using
> {_}equals{_}() matching. _REPLACE_ _INTO_ is not filtered, causing it to be
> incorrectly passed to DDL parsing and leading to connector failure.
> This issue was originally reported and fixed in upstream Debezium as
> [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428] via [PR
> #7004|https://github.com/debezium/debezium/pull/7004]. Since Flink CDC
> maintains a forked copy of {_}MySqlStreamingChangeEventSource{_}, the fix
> needs to be synced.
> h3. Root Cause
> In {_}handleQueryEvent{_}(), the DML filtering logic:
>
> {code:java}
> String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
> if (upperCasedStatementBegin.equals("INSERT ")
> || upperCasedStatementBegin.equals("UPDATE ")
> || upperCasedStatementBegin.equals("DELETE ")) {
> // filtered
> } {code}
>
> has two problems:
> * _REPLACE_ prefix is missing — _REPLACE INTO_ statements pass through to DDL
> parsing
> {_}* equals{_}() is used instead of {_}startsWith{_}(), which is fragile when
> the prefix length changes
> h3. Fix
> Sync the changes from Debezium PR #7004:
> * Add _REPLACE_ to the DML prefix list
> * Extract {_}isDmlStatement{_}() method using {_}startsWith{_}() for robust
> matching
> * Increase _Strings.getBegin_ length from 7 to 8 (to accommodate
> "{_}REPLACE{_} " which is 8 chars)
> * Use parameterized logging instead of string concatenation
> h3. How to Reproduce
> # Start Flink CDC MySQL connector monitoring a table
> # Run pt-table-checksum (or simulate by executing {_}SET SESSION
> binlog_format = 'STATEMENT{_}' followed by REPLACE INTO ...)
> # Connector fails when attempting to parse _REPLACE INTO_ as DDL
> h3. References
> * Upstream fix: [DBZ-9428|https://issues.redhat.com/browse/DBZ-9428]
> * Upstream PR:
> [debezium/debezium#7004|https://github.com/debezium/debezium/pull/7004]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)