[
https://issues.apache.org/jira/browse/FLINK-39613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-39613:
----------------------------
Description:
*Description*
In a Flink CDC YAML pipeline, the transform rule has three closely related
string fields:
* projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
), which strips the surrounding MySQL-style backticks and yields the unquoted
identifier.
* primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain
String#split(",") + String#trim() , which keeps the backticks verbatim as part
of the column name.
Because of the asymmetry, any column name that has to be wrapped in MySQL-style
backticks (upstream-native column or projection-derived column — it does not
matter which) cannot be referenced from primary-keys / partition-keys : the
stored literal `order-id` is matched byte-for-byte against the schema column
order-id , and the match fails.
This is specifically a defect of primary-keys / partition-keys parsing;
projection and filter on the same rule already accept backticks correctly
through Calcite.
*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator,
so:
* uses a column name that contains a special character ( - ), so SQL quoting
is unambiguously required;
* puts a plain identifier first in each list, so the YAML scalar itself does
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." )
is needed.
*Upstream DDL (MySQL)*
{code:java}
CREATE TABLE mydb.web_order (
id BIGINT NOT NULL,
`order-id` VARCHAR(64) NOT NULL,
region VARCHAR(32) NOT NULL,
payload VARCHAR(128),
PRIMARY KEY (id, `order-id`)
); {code}
The column name order-id is a perfectly legal MySQL identifier but contains a -
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink
CDC carries the raw column name order-id (without backticks) through its
internal Schema .
*Pipeline YAML*
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # any sink reproduces
# ...
transform:
- source-table: mydb.web_order
projection: \*
primary-keys: id, `order-id`
partition-keys: region, `order-id`
{code}
* Expected: the pipeline starts and uses (id, order-id) as primary keys,
(region, order-id) as partition keys.
* Actual: the pipeline aborts at startup because the literal string `order-id`
is not a column in the (already unquoted) schema [id, order-id, region,
payload] . Only id and region match; the backtick-quoted entries are treated as
unknown columns.
was:
*Description*
In a Flink CDC YAML pipeline, the transform rule has three closely related
string fields:
* projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
), which strips the surrounding MySQL-style backticks and yields the unquoted
identifier.
* primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain
String#split(",") + String#trim() , which keeps the backticks verbatim as part
of the column name.
Because of the asymmetry, any column name that has to be wrapped in MySQL-style
backticks (upstream-native column or projection-derived column — it does not
matter which) cannot be referenced from primary-keys / partition-keys : the
stored literal `order-id` is matched byte-for-byte against the schema column
order-id , and the match fails.
This is specifically a defect of primary-keys / partition-keys parsing;
projection and filter on the same rule already accept backticks correctly
through Calcite.
*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator,
so:
* uses a column name that contains a special character ( - ), so SQL quoting
is unambiguously required;
* puts a plain identifier first in each list, so the YAML scalar itself does
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." )
is needed.
*Upstream DDL (MySQL)*
{code:java}
CREATE TABLE mydb.web_order (
id BIGINT NOT NULL,
`order-id` VARCHAR(64) NOT NULL,
region VARCHAR(32) NOT NULL,
payload VARCHAR(128),
PRIMARY KEY (id, `order-id`)
); {code}
The column name order-id is a perfectly legal MySQL identifier but contains a -
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink
CDC carries the raw column name order-id (without backticks) through its
internal Schema .
*Pipeline YAML*
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # any sink reproduces
# ...
transform:
- source-table: mydb.web_order
# No projection needed to reproduce — the column comes straight from the
upstream schema.
# The list starts with a plain identifier, so the YAML scalar does not
begin with '`'.
primary-keys: id, `order-id`
partition-keys: region, `order-id`
{code}
* Expected: the pipeline starts and uses (id, order-id) as primary keys,
(region, order-id) as partition keys.
* Actual: the pipeline aborts at startup because the literal string `order-id`
is not a column in the (already unquoted) schema [id, order-id, region,
payload] . Only id and region match; the backtick-quoted entries are treated as
unknown columns.
> Flink cdc pipeline primary-keys / partition-keys keep backticks verbatim and
> fail to match column names that require SQL quoting
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39613
> URL: https://issues.apache.org/jira/browse/FLINK-39613
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.4.0, cdc-3.5.0
> Reporter: Ran Tao
> Priority: Major
>
> *Description*
> In a Flink CDC YAML pipeline, the transform rule has three closely related
> string fields:
> * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
> ), which strips the surrounding MySQL-style backticks and yields the unquoted
> identifier.
> * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a
> plain String#split(",") + String#trim() , which keeps the backticks verbatim
> as part of the column name.
> Because of the asymmetry, any column name that has to be wrapped in
> MySQL-style backticks (upstream-native column or projection-derived column —
> it does not matter which) cannot be referenced from primary-keys /
> partition-keys : the stored literal `order-id` is matched byte-for-byte
> against the schema column order-id , and the match fails.
> This is specifically a defect of primary-keys / partition-keys parsing;
> projection and filter on the same rule already accept backticks correctly
> through Calcite.
> *How to reproduce*
> To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2
> constraint — a plain (unquoted) YAML scalar may not start with the `
> indicator, so:
> * uses a column name that contains a special character ( - ), so SQL quoting
> is unambiguously required;
> * puts a plain identifier first in each list, so the YAML scalar itself does
> not start with ` , and no YAML-side workaround (flow sequence, '...' , "..."
> ) is needed.
> *Upstream DDL (MySQL)*
> {code:java}
> CREATE TABLE mydb.web_order (
> id BIGINT NOT NULL,
> `order-id` VARCHAR(64) NOT NULL,
> region VARCHAR(32) NOT NULL,
> payload VARCHAR(128),
> PRIMARY KEY (id, `order-id`)
> ); {code}
>
> The column name order-id is a perfectly legal MySQL identifier but contains a
> - , so MySQL requires it to be quoted with backticks in any SQL statement.
> Flink CDC carries the raw column name order-id (without backticks) through
> its internal Schema .
> *Pipeline YAML*
> {code:java}
> source:
> type: mysql
> # ...
> sink:
> type: paimon # any sink reproduces
> # ...
> transform:
> - source-table: mydb.web_order
> projection: \*
> primary-keys: id, `order-id`
> partition-keys: region, `order-id`
> {code}
> * Expected: the pipeline starts and uses (id, order-id) as primary keys,
> (region, order-id) as partition keys.
> * Actual: the pipeline aborts at startup because the literal string
> `order-id` is not a column in the (already unquoted) schema [id, order-id,
> region, payload] . Only id and region match; the backtick-quoted entries are
> treated as unknown columns.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)