[ 
https://issues.apache.org/jira/browse/FLINK-39613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-39613:
----------------------------
    Description: 
*Description*

In a Flink CDC YAML pipeline, the transform rule has three closely related 
string fields:
 * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
), which strips the surrounding MySQL-style backticks and yields the unquoted 
identifier.
 * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain 
String#split(",") + String#trim() , which keeps the backticks verbatim as part 
of the column name.

Because of the asymmetry, any column name that has to be wrapped in MySQL-style 
backticks (upstream-native column or projection-derived column — it does not 
matter which) cannot be referenced from primary-keys / partition-keys : the 
stored literal `order-id` is matched byte-for-byte against the schema column 
order-id , and the match fails.

This is specifically a defect of primary-keys / partition-keys parsing; 
projection and filter on the same rule already accept backticks correctly 
through Calcite.

*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2 
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator, 
so:
 * uses a column name that contains a special character ( - ), so SQL quoting 
is unambiguously required;
 * puts a plain identifier first in each list, so the YAML scalar itself does 
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." ) 
is needed. Upstream DDL (MySQL)

{code:java}
CREATE TABLE mydb.web_order (
  id            BIGINT NOT NULL,
  `order-id`    VARCHAR(64) NOT NULL,
  region        VARCHAR(32) NOT NULL,
  payload       VARCHAR(128),
  PRIMARY KEY (id, `order-id`)
); {code}
 

The column name order-id is a perfectly legal MySQL identifier but contains a - 
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink 
CDC carries the raw column name order-id (without backticks) through its 
internal Schema .

Pipeline YAML
{code:java}
source:
  type: mysql
  # ...
sink:
  type: paimon            # any sink reproduces
  # ...
transform:
  - source-table: mydb.web_order
    # No projection needed to reproduce — the column comes straight from the 
upstream schema.
    # The list starts with a plain identifier, so the YAML scalar does not 
begin with '`'.
    primary-keys: id, `order-id`
    partition-keys: region, `order-id` 
{code}
 * Expected: the pipeline starts and uses (id, order-id) as primary keys, 
(region, order-id) as partition keys.
 * Actual: the pipeline aborts at startup because the literal string `order-id` 
is not a column in the (already unquoted) schema [id, order-id, region, 
payload] . Only id and region match; the backtick-quoted entries are treated as 
unknown columns.

 

  was:
In a Flink CDC YAML pipeline, a transform rule accepts three closely related 
fields:
 * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
). Calcite strips the MySQL-style backticks around identifiers, so the 
parsed/propagated column name is the unquoted name.
 * primary-keys / partition-keys — currently parsed in SchemaMetadataTransform 
by a plain String#split(",") + String#trim() . The backticks are kept verbatim 
as part of the column name.

Because the two fields use different parsing strategies, any column name that 
has to be wrapped in MySQL-style backticks cannot be referenced from 
primary-keys / partition-keys . This is not specific to projection aliases — 
the same failure occurs when:

1. The column is a plain upstream column carried through the pipeline (no 
projection is defined, or projection: * is used), and the upstream column name 
itself is a SQL reserved word ( `time` , `user` , `order` , `key` …) or 
contains special characters (spaces, - , non-ASCII, …).

2. The column is produced by a projection alias that has to be backtick-quoted 
(because the alias is a reserved word or contains special characters).
In both cases the user must write the backticks on the primary-keys / 
partition-keys side to express the quoted identifier semantically, but the 
current implementation stores the literal string `time` (with backticks) and 
then tries to match it against the downstream schema column time (without 
backticks). The match fails and the pipeline aborts at startup/runtime with a 
column-not-found / schema-mismatch error.

The issue is surprising to users because:
 - The YAML is syntactically valid and passes the YAML/Jackson layer.
 - Calcite-based parts of the same rule (projection, filter) already understand 
and unquote backticks.
 - The failure only surfaces at runtime, and the error message does not hint at 
the quoting mismatch.

*How to reproduce*

*Case A* — upstream column whose name is a SQL reserved word.
Assume an upstream MySQL table whose DDL legitimately uses a reserved word as a 
column name:
db:
{code:java}
CREATE TABLE mydb.web_order (
  id       BIGINT PRIMARY KEY,
  `time`   TIMESTAMP,
  payload  VARCHAR(128)
); {code}
pipeline:

 
{code:java}
source:
  type: mysql
  # ...
sink:
  type: paimon          # or values / doris / starrocks — sink is irrelevant
  # ...
transform:
  - source-table: mydb.web_order
    projection: \*
    primary-keys: id, `time`
    partition-keys: `time` {code}
Expected: the pipeline starts and uses id + time as primary key.Actual: startup 
fails because the literal string `time` is not a column in the 
upstream/projected schema (which only contains time ).

*Case B* — projection alias that has to be backtick-quoted
{code:java}
transform:
  - source-table: mydb.web_order
    projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
    primary-keys: id, `time`
    partition-keys: `time` {code}
 

Same root cause, same failure mode.

Both cases reproduce for any identifier that requires quoting: reserved words ( 
time , user , order , key , group , select , …), identifiers containing spaces, 
- , . , or non-ASCII characters, and identifiers that collide with Calcite 
keywords.


> Flink cdc pipeline primary-keys / partition-keys keep backticks verbatim and 
> fail to match column names that require SQL quoting
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39613
>                 URL: https://issues.apache.org/jira/browse/FLINK-39613
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0, cdc-3.5.0
>            Reporter: Ran Tao
>            Priority: Major
>
> *Description*
> In a Flink CDC YAML pipeline, the transform rule has three closely related 
> string fields:
>  * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
> ), which strips the surrounding MySQL-style backticks and yields the unquoted 
> identifier.
>  * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a 
> plain String#split(",") + String#trim() , which keeps the backticks verbatim 
> as part of the column name.
> Because of the asymmetry, any column name that has to be wrapped in 
> MySQL-style backticks (upstream-native column or projection-derived column — 
> it does not matter which) cannot be referenced from primary-keys / 
> partition-keys : the stored literal `order-id` is matched byte-for-byte 
> against the schema column order-id , and the match fails.
> This is specifically a defect of primary-keys / partition-keys parsing; 
> projection and filter on the same rule already accept backticks correctly 
> through Calcite.
> *How to reproduce*
> To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2 
> constraint — a plain (unquoted) YAML scalar may not start with the ` 
> indicator, so:
>  * uses a column name that contains a special character ( - ), so SQL quoting 
> is unambiguously required;
>  * puts a plain identifier first in each list, so the YAML scalar itself does 
> not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." 
> ) is needed. Upstream DDL (MySQL)
> {code:java}
> CREATE TABLE mydb.web_order (
>   id            BIGINT NOT NULL,
>   `order-id`    VARCHAR(64) NOT NULL,
>   region        VARCHAR(32) NOT NULL,
>   payload       VARCHAR(128),
>   PRIMARY KEY (id, `order-id`)
> ); {code}
>  
> The column name order-id is a perfectly legal MySQL identifier but contains a 
> - , so MySQL requires it to be quoted with backticks in any SQL statement. 
> Flink CDC carries the raw column name order-id (without backticks) through 
> its internal Schema .
> Pipeline YAML
> {code:java}
> source:
>   type: mysql
>   # ...
> sink:
>   type: paimon            # any sink reproduces
>   # ...
> transform:
>   - source-table: mydb.web_order
>     # No projection needed to reproduce — the column comes straight from the 
> upstream schema.
>     # The list starts with a plain identifier, so the YAML scalar does not 
> begin with '`'.
>     primary-keys: id, `order-id`
>     partition-keys: region, `order-id` 
> {code}
>  * Expected: the pipeline starts and uses (id, order-id) as primary keys, 
> (region, order-id) as partition keys.
>  * Actual: the pipeline aborts at startup because the literal string 
> `order-id` is not a column in the (already unquoted) schema [id, order-id, 
> region, payload] . Only id and region match; the backtick-quoted entries are 
> treated as unknown columns.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to