[
https://issues.apache.org/jira/browse/FLINK-39613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-39613:
----------------------------
Description:
In a Flink CDC YAML pipeline, a transform rule accepts three closely related
fields:
* projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
). Calcite strips the MySQL-style backticks around identifiers, so the
parsed/propagated column name is the unquoted name.
* primary-keys / partition-keys — currently parsed in SchemaMetadataTransform
by a plain String#split(",") + String#trim() . The backticks are kept verbatim
as part of the column name.
Because the two fields use different parsing strategies, any column name that
has to be wrapped in MySQL-style backticks cannot be referenced from
primary-keys / partition-keys . This is not specific to projection aliases —
the same failure occurs when:
1. The column is a plain upstream column carried through the pipeline (no
projection is defined, or projection: * is used), and the upstream column name
itself is a SQL reserved word ( `time` , `user` , `order` , `key` …) or
contains special characters (spaces, - , non-ASCII, …).
2. The column is produced by a projection alias that has to be backtick-quoted
(because the alias is a reserved word or contains special characters).
In both cases the user must write the backticks on the primary-keys /
partition-keys side to express the quoted identifier semantically, but the
current implementation stores the literal string `time` (with backticks) and
then tries to match it against the downstream schema column time (without
backticks). The match fails and the pipeline aborts at startup/runtime with a
column-not-found / schema-mismatch error.
The issue is surprising to users because:
- The YAML is syntactically valid and passes the YAML/Jackson layer.
- Calcite-based parts of the same rule (projection, filter) already understand
and unquote backticks.
- The failure only surfaces at runtime, and the error message does not hint at
the quoting mismatch.
*How to reproduce*
Case A — upstream column whose name is a SQL reserved word (no projection
involved)
Assume an upstream MySQL table whose DDL legitimately uses a reserved word as a
column name:
db:
{code:java}
CREATE TABLE mydb.web_order (
id BIGINT PRIMARY KEY,
`time` TIMESTAMP,
payload VARCHAR(128)
); {code}
pipeline:
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # or values / doris / starrocks — sink is irrelevant
# ...
transform:
- source-table: mydb.web_order
# no projection, or: projection: \*
primary-keys: id, `time`
partition-keys: `time` {code}
Expected: the pipeline starts and uses id + time as primary key.Actual: startup
fails because the literal string `time` is not a column in the
upstream/projected schema (which only contains time ).
Case B — projection alias that has to be backtick-quoted
{code:java}
transform:
- source-table: mydb.web_order
projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
primary-keys: id, `time`
partition-keys: `time` {code}
Same root cause, same failure mode.
Both cases reproduce for any identifier that requires quoting: reserved words (
time , user , order , key , group , select , …), identifiers containing spaces,
- , . , or non-ASCII characters, and identifiers that collide with Calcite
keywords.
was:
In a Flink CDC YAML pipeline, the transform rule accepts three closely related
fields:
- projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
), which strips the MySQL-style backticks around identifiers.
- primary-keys / partition-keys — currently split by a plain String#split(",")
+ String#trim() in SchemaMetadataTransform , which keeps the backticks verbatim
as part of the column name.
Because the two sides use different parsing strategies, a backtick-quoted
identifier used as a projection alias (for example a SQL reserved word such as
`time` ) can no longer be referenced from primary-keys / partition-keys : the
literal string `time` never matches the projection output column time , and the
pipeline fails at runtime with a column-not-found / schema-mismatch error.
This is surprising to users because:
1. The YAML is syntactically valid and passes the YAML/Jackson layer.
2. The projection clause (on the same transform rule) accepts and even requires
the backticks when the alias is a reserved word.
3. The error surfaces only at job startup/runtime, not at validation time, and
the error message does not hint at the quoting mismatch.
*How to reproduce*
Minimal MySQL → any sink pipeline:
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # or values/doris/starrocks, any sink works
# ...
transform:
- source-table: mydb.web_order
projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
primary-keys: id, `time`
partition-keys: `time` {code}
Expected: the pipeline starts and routes records using time as part of the
primary key / partition key.
Actual: the job fails because `time` (with backticks) is not found in the
projected schema, which only contains the unquoted column time .
The same failure reproduces when the backtick-quoted name contains special
characters (e.g. `order-id` , `with space` ) or is a SQL reserved word ( `time`
, `user` , `order` , …).
*Root cause*
- _org.apache.flink.cdc.runtime.operators.transform.SchemaMetadataTransform_
parses primaryKeyString / partitionKeyString with:
{code:java}
Arrays.asList(primaryKeyString.split(",")).stream().map
(String::trim)... {code}
which preserves any surrounding backticks.
- _org.apache.flink.cdc.runtime.parser.TransformParser_ parses projection via
Calcite ( Lex.JAVA , MYSQL_5 ), which unquotes backtick identifiers.
The downstream matching between the declared primary/partition keys and the
projection output is string-based, so the asymmetry is fatal.
> Flink cdc pipeline primary-keys / partition-keys cannot resolve
> backtick-quoted identifiers
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-39613
> URL: https://issues.apache.org/jira/browse/FLINK-39613
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.4.0, cdc-3.5.0
> Reporter: Ran Tao
> Priority: Major
>
> In a Flink CDC YAML pipeline, a transform rule accepts three closely related
> fields:
> * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
> ). Calcite strips the MySQL-style backticks around identifiers, so the
> parsed/propagated column name is the unquoted name.
> * primary-keys / partition-keys — currently parsed in
> SchemaMetadataTransform by a plain String#split(",") + String#trim() . The
> backticks are kept verbatim as part of the column name.
> Because the two fields use different parsing strategies, any column name that
> has to be wrapped in MySQL-style backticks cannot be referenced from
> primary-keys / partition-keys . This is not specific to projection aliases —
> the same failure occurs when:
> 1. The column is a plain upstream column carried through the pipeline (no
> projection is defined, or projection: * is used), and the upstream column
> name itself is a SQL reserved word ( `time` , `user` , `order` , `key` …) or
> contains special characters (spaces, - , non-ASCII, …).
> 2. The column is produced by a projection alias that has to be
> backtick-quoted (because the alias is a reserved word or contains special
> characters).
> In both cases the user must write the backticks on the primary-keys /
> partition-keys side to express the quoted identifier semantically, but the
> current implementation stores the literal string `time` (with backticks) and
> then tries to match it against the downstream schema column time (without
> backticks). The match fails and the pipeline aborts at startup/runtime with a
> column-not-found / schema-mismatch error.
> The issue is surprising to users because:
> - The YAML is syntactically valid and passes the YAML/Jackson layer.
> - Calcite-based parts of the same rule (projection, filter) already
> understand and unquote backticks.
> - The failure only surfaces at runtime, and the error message does not hint
> at the quoting mismatch.
> *How to reproduce*
> Case A — upstream column whose name is a SQL reserved word (no projection
> involved)
> Assume an upstream MySQL table whose DDL legitimately uses a reserved word as
> a column name:
> db:
> {code:java}
> CREATE TABLE mydb.web_order (
> id BIGINT PRIMARY KEY,
> `time` TIMESTAMP,
> payload VARCHAR(128)
> ); {code}
> pipeline:
>
> {code:java}
> source:
> type: mysql
> # ...
> sink:
> type: paimon # or values / doris / starrocks — sink is irrelevant
> # ...
> transform:
> - source-table: mydb.web_order
> # no projection, or: projection: \*
> primary-keys: id, `time`
> partition-keys: `time` {code}
> Expected: the pipeline starts and uses id + time as primary key.Actual:
> startup fails because the literal string `time` is not a column in the
> upstream/projected schema (which only contains time ).
> Case B — projection alias that has to be backtick-quoted
> {code:java}
> transform:
> - source-table: mydb.web_order
> projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
> primary-keys: id, `time`
> partition-keys: `time` {code}
>
> Same root cause, same failure mode.
> Both cases reproduce for any identifier that requires quoting: reserved words
> ( time , user , order , key , group , select , …), identifiers containing
> spaces, - , . , or non-ASCII characters, and identifiers that collide with
> Calcite keywords.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)