[
https://issues.apache.org/jira/browse/FLINK-39749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39749:
-----------------------------------
Labels: mysql-cdc-connector pull-request-available (was:
mysql-cdc-connector)
> [mysql-cdc] Introduce scan.incremental.snapshot.string-key.compare-mode to
> align Java and MySQL ordering for string chunk keys
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39749
> URL: https://issues.apache.org/jira/browse/FLINK-39749
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Environment: MySQL 8.0.x, table with CHAR(36) PK, collation
> utf8mb4_general_ci
> Flink CDC 3.6.0
> Pipeline job: mysql-cdc -> paimon
> Reporter: Ziyan Lee
> Priority: Major
> Labels: mysql-cdc-connector, pull-request-available
> Fix For: cdc-3.7.0
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Problem Statement
> -----------------
> When using a string-typed column (CHAR / VARCHAR) as the chunk split key in
> MySQL CDC incremental snapshot, chunk boundaries computed by Flink CDC may
> diverge from the actual row order in MySQL if the table uses a
> case-insensitive
> collation (e.g., utf8mb4_general_ci, utf8mb4_unicode_ci).
> This happens because:
> 1. SQL query layer: MySQL evaluates WHERE predicates and ORDER BY using the
> column's collation.
> - utf8mb4_general_ci: 'A' == 'a' (case-insensitive)
> 2. Java logic layer: Flink CDC uses String.compareTo() to determine chunk
> boundaries and route binlog events to splits.
> - Java default: 'A' (65) < 'a' (97) (case-sensitive, Unicode code-point
> order)
> When these two ordering rules differ, the following symptoms occur:
> - Chunk splitting terminates prematurely. The last chunk becomes unbounded
> (splitEnd == null), causing a single huge chunk that reads most or all of the
> table.
> - During backfill / binlog phase, a record may be assigned to the wrong
> split because Java's boundary check does not match MySQL's actual data range.
> - In the worst case, this leads to duplicate processing or lost events
> downstream.
> Concrete example
> ----------------
>
> Table: orders (id CHAR(36) PRIMARY KEY) with utf8mb4_general_ci
> Data: mixed-case UUIDs such as '9f...', 'A1...', 'a2...'
> MySQL ORDER BY id (utf8mb4_general_ci) treats 'A1...' and 'a1...' as equal.
> Java String.compareTo() orders them as: '9f...' < 'A1...' < 'a1...' <
> 'a2...'
> If chunkSize = 8096 and MySQL returns 'A1...' as a boundary, Java may later
> fail to find the next chunk maximum because its internal ordering sees
> 'A1...' < 'a1...', while MySQL sees them as the same group. queryNextChunkMax
> eventually returns null, and the remaining data falls into an unbounded
> chunk.
> Proposed Solution
> -----------------
> Introduce a new configuration option:
> scan.incremental.snapshot.string-key.compare-mode
> with three modes:
>
> | Mode | Java comparison | SQL behavior
> | Recommended for |
>
> |--------------------|------------------------|-------------------------------------------|-----------------------------------------|
> | default | String.compareTo() | Standard predicates (no
> BINARY) | Binary collations (utf8mb4_bin) |
> | case-insensitive | compareToIgnoreCase() | Standard predicates (no
> BINARY) | Case-insensitive collations (utf8mb4_general_ci,
> utf8mb4_unicode_ci) |
> | binary | String.compareTo() | BINARY col <= BINARY ?
> | Force byte-level exact match |
> Key implementation points:
> - Add ChunkKeyCompareMode enum (DEFAULT, CASE_INSENSITIVE, BINARY).
> - Propagate the mode through all three API layers: DataStream API
> (MySqlSourceBuilder), Flink SQL (MySqlTableSourceFactory), and Pipeline YAML
> (MySqlDataSourceFactory).
> - Update StatementUtils to generate BINARY ... BINARY predicates when mode
> == BINARY.
> - Update ObjectUtils, SplitKeyUtils, and RecordUtils to perform comparisons
> using the selected mode.
> - Ensure snapshot splitting (MySqlChunkSplitter), snapshot reading
> (SnapshotSplitReader / MySqlSnapshotSplitReadTask), and binlog reading
> (BinlogSplitReader) all use the SAME mode consistently.
> Scope & Compatibility
> ---------------------
> - Default value is "default", preserving backward compatibility.
> - Changing this option requires a fresh checkpoint restart because split
> boundaries are persisted in snapshot state.
> - CASE_INSENSITIVE is safe for pure ASCII keys (e.g., UUIDs). For non-ASCII
> characters, Java's compareToIgnoreCase() and MySQL's collation folding tables
> are not strictly equivalent; BINARY mode is recommended in those cases.
> Documentation & Tests
> ---------------------
>
> - English and Chinese docs updated for both mysql-cdc source connector and
> pipeline connector pages.
> - MySqlTableSourceFactoryTest updated to cover the new parameter.
> ---
--
This message was sent by Atlassian Jira
(v8.20.10#820010)