[
https://issues.apache.org/jira/browse/FLINK-39758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mao Jiayi updated FLINK-39758:
------------------------------
Description:
When using the Kafka pipeline connector with Canal JSON format and the
*InferSchemaStrategy.MYSQL_TYPE* schema inference strategy, the job may fail
when MySQL columns have missing or invalid length information.
The failure happens in {*}MySqlTypeUtils{*}. The previous implementation does
not properly validate column length before type conversion. When the MySQL
binlog metadata contains columns with {*}length <= 0{*}, the code directly
passes these invalid values to Flink's *DataTypes* constructors. For *VARCHAR*
type, the code only checks {{== 0}} but not negative values. For *DECIMAL*
type, there is no validation for non-positive length. For *VARBINARY* type,
there is no length check at all.
This issue occurs when parsing Canal JSON messages where certain MySQL column
schemas have undefined lengths, causing invalid data type creation or runtime
exceptions during schema inference.
was:
When using CDC pipelines with schema merging operations that involve *complex
types (ARRAY, MAP, ROW)* being coerced to {*}STRING type{*}, the job may fail
due to missing type conversion logic in {*}SchemaMergingUtils{*}.
The failure happens in {*}SchemaMergingUtils.coerceObject(){*}. When attempting
to coerce complex types to STRING during schema merging, the utility only
handles primitive types and binary data conversions. If the original field is a
MapData, ArrayData, or RecordData, the coercion logic falls through to the
default toString() conversion, which either produces incorrect string
representations or fails entirely because these complex types don't have proper
toString() implementations for string coercion.
This issue is usually not exposed in simple type scenarios because primitive
types (INTEGER, VARCHAR, BOOLEAN, TIMESTAMP, etc.) have direct coercion paths
to STRING through existing conversion rules. The schema merging logic works
correctly for these basic type conversions, as they can be directly transformed
without special handling.
However, when CDC pipelines involve schema evolution or field type changes
where complex types need to be merged into STRING columns, the missing
conversion logic becomes critical. For example, when merging a table with
*ARRAY<INT>* column into a target table with *STRING* column at the same
position, *SchemaMergingUtils* cannot properly serialize the array data
structure into its string representation, causing the merge operation to fail
or produce incorrect results.
This issue is not specific to any particular source connector. It can affect
any CDC pipeline with schema merging scenarios involving complex-to-STRING type
coercion, regardless of whether the source is MySQL, PostgreSQL, Kafka, or any
other distributed pipeline source with schema evolution capabilities.
Summary: Fix error in canal json mysqlType when mysql type is varchar
with no length (was: Fix unable to coerce complex types (ARRAY/MAP/ROW) to
STRING)
> Fix error in canal json mysqlType when mysql type is varchar with no length
> ---------------------------------------------------------------------------
>
> Key: FLINK-39758
> URL: https://issues.apache.org/jira/browse/FLINK-39758
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Mao Jiayi
> Priority: Major
> Labels: pull-request-available
>
> When using the Kafka pipeline connector with Canal JSON format and the
> *InferSchemaStrategy.MYSQL_TYPE* schema inference strategy, the job may fail
> when MySQL columns have missing or invalid length information.
> The failure happens in {*}MySqlTypeUtils{*}. The previous implementation does
> not properly validate column length before type conversion. When the MySQL
> binlog metadata contains columns with {*}length <= 0{*}, the code directly
> passes these invalid values to Flink's *DataTypes* constructors. For
> *VARCHAR* type, the code only checks {{== 0}} but not negative values. For
> *DECIMAL* type, there is no validation for non-positive length. For
> *VARBINARY* type, there is no length check at all.
> This issue occurs when parsing Canal JSON messages where certain MySQL column
> schemas have undefined lengths, causing invalid data type creation or runtime
> exceptions during schema inference.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)