Ran Tao created FLINK-39613:
-------------------------------

             Summary: Flink pipeline cdc primary-keys / partition-keys cannot 
resolve backtick-quoted identifiers that are used as projection aliases
                 Key: FLINK-39613
                 URL: https://issues.apache.org/jira/browse/FLINK-39613
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0, cdc-3.4.0
            Reporter: Ran Tao


In a Flink CDC YAML pipeline, the transform rule accepts three closely related 
fields:
 - projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
), which strips the MySQL-style backticks around identifiers.
 - primary-keys / partition-keys — currently split by a plain String#split(",") 
+ String#trim() in SchemaMetadataTransform , which keeps the backticks verbatim 
as part of the column name.
Because the two sides use different parsing strategies, a backtick-quoted 
identifier used as a projection alias (for example a SQL reserved word such as 
`time` ) can no longer be referenced from primary-keys / partition-keys : the 
literal string `time` never matches the projection output column time , and the 
pipeline fails at runtime with a column-not-found / schema-mismatch error.

This is surprising to users because:

1. The YAML is syntactically valid and passes the YAML/Jackson layer.
2. The projection clause (on the same transform rule) accepts and even requires 
the backticks when the alias is a reserved word.
3. The error surfaces only at job startup/runtime, not at validation time, and 
the error message does not hint at the quoting mismatch.



*How to reproduce*

Minimal MySQL → any sink pipeline:
{code:java}
source:
  type: mysql
  # ...
sink:
  type: paimon   # or values/doris/starrocks, any sink works
  # ...
transform:
  - source-table: mydb.web_order
    projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
    primary-keys: id, `time`
    partition-keys: `time` {code}

Expected: the pipeline starts and routes records using time as part of the 
primary key / partition key.

Actual: the job fails because `time` (with backticks) is not found in the 
projected schema, which only contains the unquoted column time .

The same failure reproduces when the backtick-quoted name contains special 
characters (e.g. `order-id` , `with space` ) or is a SQL reserved word ( `time` 
, `user` , `order` , …).

*Root cause*

- _org.apache.flink.cdc.runtime.operators.transform.SchemaMetadataTransform_ 
parses primaryKeyString / partitionKeyString with:
  
{code:java}
  Arrays.asList(primaryKeyString.split(",")).stream().map
  (String::trim)... {code}
which preserves any surrounding backticks.

- _org.apache.flink.cdc.runtime.parser.TransformParser_ parses projection via 
Calcite ( Lex.JAVA , MYSQL_5 ), which unquotes backtick identifiers.
The downstream matching between the declared primary/partition keys and the 
projection output is string-based, so the asymmetry is fatal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to