[ 
https://issues.apache.org/jira/browse/FLINK-39758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mao Jiayi updated FLINK-39758:
------------------------------
    Description: 
When merging DECIMAL types from multiple tables in a route-merge pipeline, the 
job may fail if the computed result precision exceeds the maximum allowed value.

The failure happens in *SchemaMergingUtils.mergeDecimalTypes()* and 
{*}SchemaUtils.inferWiderType(){*}. The current implementation calculates the 
merged precision as {{{}max(lhsIntDigits, rhsIntDigits) + max(lhsScale, 
rhsScale){}}}, and then uses *Preconditions.checkArgument* to enforce that this 
value must not exceed {*}DecimalType.MAX_PRECISION (38){*}. When the sum 
exceeds 38, an *IllegalArgumentException* is thrown and the entire pipeline 
fails.

This issue is typically exposed in multi-table route-merge scenarios where 
tables from different data sources have DECIMAL columns with significantly 
different precisions and scales. 

This issue is not specific to any particular source connector. It can affect 
any pipeline that uses route-merge to combine tables with mismatched DECIMAL 
precisions.

  was:
When using the Kafka pipeline connector with Canal JSON format and the 
*InferSchemaStrategy.MYSQL_TYPE* schema inference strategy, the job may fail 
when MySQL columns have missing or invalid length information.

The failure happens in {*}MySqlTypeUtils{*}. The previous implementation does 
not properly validate column length before type conversion. When the MySQL 
binlog metadata contains columns with {*}length <= 0{*}, the code directly 
passes these invalid values to Flink's *DataTypes* constructors. For *VARCHAR* 
type, the code only checks {{== 0}} but not negative values. For *DECIMAL* 
type, there is no validation for non-positive length. For *VARBINARY* type, 
there is no length check at all.

This issue occurs when parsing Canal JSON messages where certain MySQL column 
schemas have undefined lengths, causing invalid data type creation or runtime 
exceptions during schema inference.

        Summary: Fix DECIMAL OOB in SchemaMergingUtils  (was: Fix error in 
canal json mysqlType when mysql type is varchar with no length)

> Fix DECIMAL OOB in SchemaMergingUtils
> -------------------------------------
>
>                 Key: FLINK-39758
>                 URL: https://issues.apache.org/jira/browse/FLINK-39758
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Mao Jiayi
>            Priority: Major
>              Labels: pull-request-available
>
> When merging DECIMAL types from multiple tables in a route-merge pipeline, 
> the job may fail if the computed result precision exceeds the maximum allowed 
> value.
> The failure happens in *SchemaMergingUtils.mergeDecimalTypes()* and 
> {*}SchemaUtils.inferWiderType(){*}. The current implementation calculates the 
> merged precision as {{{}max(lhsIntDigits, rhsIntDigits) + max(lhsScale, 
> rhsScale){}}}, and then uses *Preconditions.checkArgument* to enforce that 
> this value must not exceed {*}DecimalType.MAX_PRECISION (38){*}. When the sum 
> exceeds 38, an *IllegalArgumentException* is thrown and the entire pipeline 
> fails.
> This issue is typically exposed in multi-table route-merge scenarios where 
> tables from different data sources have DECIMAL columns with significantly 
> different precisions and scales. 
> This issue is not specific to any particular source connector. It can affect 
> any pipeline that uses route-merge to combine tables with mismatched DECIMAL 
> precisions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to