[ 
https://issues.apache.org/jira/browse/FLINK-39758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mao Jiayi updated FLINK-39758:
------------------------------
    Description: 
When using the Kafka pipeline connector with Canal JSON format and the 
*InferSchemaStrategy.MYSQL_TYPE* schema inference strategy, the job may fail 
when MySQL columns have missing or invalid length information.

The failure happens in {*}MySqlTypeUtils{*}. The previous implementation does 
not properly validate column length before type conversion. When the MySQL 
binlog metadata contains columns with {*}length <= 0{*}, the code directly 
passes these invalid values to Flink's *DataTypes* constructors. For *VARCHAR* 
type, the code only checks {{== 0}} but not negative values. For *DECIMAL* 
type, there is no validation for non-positive length. For *VARBINARY* type, 
there is no length check at all.

This issue occurs when parsing Canal JSON messages where certain MySQL column 
schemas have undefined lengths, causing invalid data type creation or runtime 
exceptions during schema inference.

  was:
When using CDC pipelines with schema merging operations that involve *complex 
types (ARRAY, MAP, ROW)* being coerced to {*}STRING type{*}, the job may fail 
due to missing type conversion logic in {*}SchemaMergingUtils{*}.

The failure happens in {*}SchemaMergingUtils.coerceObject(){*}. When attempting 
to coerce complex types to STRING during schema merging, the utility only 
handles primitive types and binary data conversions. If the original field is a 
MapData, ArrayData, or RecordData, the coercion logic falls through to the 
default toString() conversion, which either produces incorrect string 
representations or fails entirely because these complex types don't have proper 
toString() implementations for string coercion.

This issue is usually not exposed in simple type scenarios because primitive 
types (INTEGER, VARCHAR, BOOLEAN, TIMESTAMP, etc.) have direct coercion paths 
to STRING through existing conversion rules. The schema merging logic works 
correctly for these basic type conversions, as they can be directly transformed 
without special handling.

However, when CDC pipelines involve schema evolution or field type changes 
where complex types need to be merged into STRING columns, the missing 
conversion logic becomes critical. For example, when merging a table with 
*ARRAY<INT>* column into a target table with *STRING* column at the same 
position, *SchemaMergingUtils* cannot properly serialize the array data 
structure into its string representation, causing the merge operation to fail 
or produce incorrect results.

This issue is not specific to any particular source connector. It can affect 
any CDC pipeline with schema merging scenarios involving complex-to-STRING type 
coercion, regardless of whether the source is MySQL, PostgreSQL, Kafka, or any 
other distributed pipeline source with schema evolution capabilities.

        Summary: Fix error in canal json mysqlType when mysql type is varchar 
with no length  (was: Fix unable to coerce complex types (ARRAY/MAP/ROW) to 
STRING)

> Fix error in canal json mysqlType when mysql type is varchar with no length
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-39758
>                 URL: https://issues.apache.org/jira/browse/FLINK-39758
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Mao Jiayi
>            Priority: Major
>              Labels: pull-request-available
>
> When using the Kafka pipeline connector with Canal JSON format and the 
> *InferSchemaStrategy.MYSQL_TYPE* schema inference strategy, the job may fail 
> when MySQL columns have missing or invalid length information.
> The failure happens in {*}MySqlTypeUtils{*}. The previous implementation does 
> not properly validate column length before type conversion. When the MySQL 
> binlog metadata contains columns with {*}length <= 0{*}, the code directly 
> passes these invalid values to Flink's *DataTypes* constructors. For 
> *VARCHAR* type, the code only checks {{== 0}} but not negative values. For 
> *DECIMAL* type, there is no validation for non-positive length. For 
> *VARBINARY* type, there is no length check at all.
> This issue occurs when parsing Canal JSON messages where certain MySQL column 
> schemas have undefined lengths, causing invalid data type creation or runtime 
> exceptions during schema inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to