Ziyan Lee created FLINK-39749:
---------------------------------

             Summary: [mysql-cdc] Introduce 
scan.incremental.snapshot.string-key.compare-mode to align Java and MySQL 
ordering for string chunk keys
                 Key: FLINK-39749
                 URL: https://issues.apache.org/jira/browse/FLINK-39749
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
         Environment:   MySQL 8.0.x, table with CHAR(36) PK, collation 
utf8mb4_general_ci
  Flink CDC 3.6.0
  Pipeline job: mysql-cdc -> paimon
            Reporter: Ziyan Lee
             Fix For: cdc-3.7.0


  Problem Statement
  -----------------

  When using a string-typed column (CHAR / VARCHAR) as the chunk split key in 
MySQL CDC incremental snapshot, chunk boundaries computed by Flink CDC may 
diverge from the actual row order in MySQL if the table uses a case-insensitive
   collation (e.g., utf8mb4_general_ci, utf8mb4_unicode_ci).

  This happens because:

  1. SQL query layer: MySQL evaluates WHERE predicates and ORDER BY using the 
column's collation.
     - utf8mb4_general_ci: 'A' == 'a' (case-insensitive)
  2. Java logic layer: Flink CDC uses String.compareTo() to determine chunk 
boundaries and route binlog events to splits.
     - Java default: 'A' (65) < 'a' (97) (case-sensitive, Unicode code-point 
order)

  When these two ordering rules differ, the following symptoms occur:

  - Chunk splitting terminates prematurely. The last chunk becomes unbounded 
(splitEnd == null), causing a single huge chunk that reads most or all of the 
table.
  - During backfill / binlog phase, a record may be assigned to the wrong split 
because Java's boundary check does not match MySQL's actual data range.
  - In the worst case, this leads to duplicate processing or lost events 
downstream.

  Concrete example
  ----------------
  
  Table: orders (id CHAR(36) PRIMARY KEY) with utf8mb4_general_ci
  Data: mixed-case UUIDs such as '9f...', 'A1...', 'a2...'

  MySQL ORDER BY id (utf8mb4_general_ci) treats 'A1...' and 'a1...' as equal.
  Java String.compareTo() orders them as: '9f...' < 'A1...' < 'a1...' < 'a2...'

  If chunkSize = 8096 and MySQL returns 'A1...' as a boundary, Java may later 
fail to find the next chunk maximum because its internal ordering sees 'A1...' 
< 'a1...', while MySQL sees them as the same group. queryNextChunkMax
  eventually returns null, and the remaining data falls into an unbounded chunk.

  Proposed Solution
  -----------------

  Introduce a new configuration option:

    scan.incremental.snapshot.string-key.compare-mode

  with three modes:
  
  | Mode               | Java comparison        | SQL behavior                  
            | Recommended for                         |
  
|--------------------|------------------------|-------------------------------------------|-----------------------------------------|
  | default            | String.compareTo()     | Standard predicates (no 
BINARY)           | Binary collations (utf8mb4_bin)         |
  | case-insensitive   | compareToIgnoreCase()  | Standard predicates (no 
BINARY)           | Case-insensitive collations (utf8mb4_general_ci, 
utf8mb4_unicode_ci) |
  | binary             | String.compareTo()     | BINARY col <= BINARY ?        
            | Force byte-level exact match            |

  Key implementation points:

  - Add ChunkKeyCompareMode enum (DEFAULT, CASE_INSENSITIVE, BINARY).
  - Propagate the mode through all three API layers: DataStream API 
(MySqlSourceBuilder), Flink SQL (MySqlTableSourceFactory), and Pipeline YAML 
(MySqlDataSourceFactory).
  - Update StatementUtils to generate BINARY ... BINARY predicates when mode == 
BINARY.
  - Update ObjectUtils, SplitKeyUtils, and RecordUtils to perform comparisons 
using the selected mode.
  - Ensure snapshot splitting (MySqlChunkSplitter), snapshot reading 
(SnapshotSplitReader / MySqlSnapshotSplitReadTask), and binlog reading 
(BinlogSplitReader) all use the SAME mode consistently.

  Scope & Compatibility
  ---------------------

  - Default value is "default", preserving backward compatibility.
  - Changing this option requires a fresh checkpoint restart because split 
boundaries are persisted in snapshot state.
  - CASE_INSENSITIVE is safe for pure ASCII keys (e.g., UUIDs). For non-ASCII 
characters, Java's compareToIgnoreCase() and MySQL's collation folding tables 
are not strictly equivalent; BINARY mode is recommended in those cases.

  Documentation & Tests
  ---------------------
  
  - English and Chinese docs updated for both mysql-cdc source connector and 
pipeline connector pages.
  - MySqlTableSourceFactoryTest updated to cover the new parameter.

  ---



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to