[ 
https://issues.apache.org/jira/browse/FLINK-39749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39749:
-----------------------------------
    Labels: mysql-cdc-connector pull-request-available  (was: 
mysql-cdc-connector)

> [mysql-cdc] Introduce scan.incremental.snapshot.string-key.compare-mode to 
> align Java and MySQL ordering for string chunk keys
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39749
>                 URL: https://issues.apache.org/jira/browse/FLINK-39749
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>         Environment:   MySQL 8.0.x, table with CHAR(36) PK, collation 
> utf8mb4_general_ci
>   Flink CDC 3.6.0
>   Pipeline job: mysql-cdc -> paimon
>            Reporter: Ziyan Lee
>            Priority: Major
>              Labels: mysql-cdc-connector, pull-request-available
>             Fix For: cdc-3.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
>   Problem Statement
>   -----------------
>   When using a string-typed column (CHAR / VARCHAR) as the chunk split key in 
> MySQL CDC incremental snapshot, chunk boundaries computed by Flink CDC may 
> diverge from the actual row order in MySQL if the table uses a 
> case-insensitive
>    collation (e.g., utf8mb4_general_ci, utf8mb4_unicode_ci).
>   This happens because:
>   1. SQL query layer: MySQL evaluates WHERE predicates and ORDER BY using the 
> column's collation.
>      - utf8mb4_general_ci: 'A' == 'a' (case-insensitive)
>   2. Java logic layer: Flink CDC uses String.compareTo() to determine chunk 
> boundaries and route binlog events to splits.
>      - Java default: 'A' (65) < 'a' (97) (case-sensitive, Unicode code-point 
> order)
>   When these two ordering rules differ, the following symptoms occur:
>   - Chunk splitting terminates prematurely. The last chunk becomes unbounded 
> (splitEnd == null), causing a single huge chunk that reads most or all of the 
> table.
>   - During backfill / binlog phase, a record may be assigned to the wrong 
> split because Java's boundary check does not match MySQL's actual data range.
>   - In the worst case, this leads to duplicate processing or lost events 
> downstream.
>   Concrete example
>   ----------------
>   
>   Table: orders (id CHAR(36) PRIMARY KEY) with utf8mb4_general_ci
>   Data: mixed-case UUIDs such as '9f...', 'A1...', 'a2...'
>   MySQL ORDER BY id (utf8mb4_general_ci) treats 'A1...' and 'a1...' as equal.
>   Java String.compareTo() orders them as: '9f...' < 'A1...' < 'a1...' < 
> 'a2...'
>   If chunkSize = 8096 and MySQL returns 'A1...' as a boundary, Java may later 
> fail to find the next chunk maximum because its internal ordering sees 
> 'A1...' < 'a1...', while MySQL sees them as the same group. queryNextChunkMax
>   eventually returns null, and the remaining data falls into an unbounded 
> chunk.
>   Proposed Solution
>   -----------------
>   Introduce a new configuration option:
>     scan.incremental.snapshot.string-key.compare-mode
>   with three modes:
>   
>   | Mode               | Java comparison        | SQL behavior                
>               | Recommended for                         |
>   
> |--------------------|------------------------|-------------------------------------------|-----------------------------------------|
>   | default            | String.compareTo()     | Standard predicates (no 
> BINARY)           | Binary collations (utf8mb4_bin)         |
>   | case-insensitive   | compareToIgnoreCase()  | Standard predicates (no 
> BINARY)           | Case-insensitive collations (utf8mb4_general_ci, 
> utf8mb4_unicode_ci) |
>   | binary             | String.compareTo()     | BINARY col <= BINARY ?      
>               | Force byte-level exact match            |
>   Key implementation points:
>   - Add ChunkKeyCompareMode enum (DEFAULT, CASE_INSENSITIVE, BINARY).
>   - Propagate the mode through all three API layers: DataStream API 
> (MySqlSourceBuilder), Flink SQL (MySqlTableSourceFactory), and Pipeline YAML 
> (MySqlDataSourceFactory).
>   - Update StatementUtils to generate BINARY ... BINARY predicates when mode 
> == BINARY.
>   - Update ObjectUtils, SplitKeyUtils, and RecordUtils to perform comparisons 
> using the selected mode.
>   - Ensure snapshot splitting (MySqlChunkSplitter), snapshot reading 
> (SnapshotSplitReader / MySqlSnapshotSplitReadTask), and binlog reading 
> (BinlogSplitReader) all use the SAME mode consistently.
>   Scope & Compatibility
>   ---------------------
>   - Default value is "default", preserving backward compatibility.
>   - Changing this option requires a fresh checkpoint restart because split 
> boundaries are persisted in snapshot state.
>   - CASE_INSENSITIVE is safe for pure ASCII keys (e.g., UUIDs). For non-ASCII 
> characters, Java's compareToIgnoreCase() and MySQL's collation folding tables 
> are not strictly equivalent; BINARY mode is recommended in those cases.
>   Documentation & Tests
>   ---------------------
>   
>   - English and Chinese docs updated for both mysql-cdc source connector and 
> pipeline connector pages.
>   - MySqlTableSourceFactoryTest updated to cover the new parameter.
>   ---



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to