[ 
https://issues.apache.org/jira/browse/FLINK-39757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mao Jiayi updated FLINK-39757:
------------------------------
    Description: 
When using Kafka sink with Debezium JSON format, the serialization may fail if 
columns have non-string default values defined in the schema.

The failure happens in {{*DebeziumJsonSerializationSchema*}} when building the 
Debezium schema. The previous implementation directly sets the default value 
expression string to the Debezium schema field without type conversion. When 
*Debezium's {{JsonConverter}}* processes the schema, it validates that the 
default value matches the field's schema type. If there is a type mismatch, it 
throws an error and the serialization fails.

This issue is usually not exposed in simple schemas where all columns use 
string types or have no default values. In various source connectors, column 
metadata may include default value expressions as strings. When these schemas 
are serialized to Debezium JSON format for Kafka sink, the string default 
values must be converted to their corresponding Java types to match the 
Debezium schema type requirements. However, the current implementation does not 
perform this conversion.

This issue is not specific to any particular source connector. It affects any 
pipeline that uses *Kafka sink with Debezium JSON format* when the source 
schema contains columns with non-string default value expressions.

  was:
When using CDC pipelines with schema merging operations that involve *complex 
types (ARRAY, MAP, ROW)* being coerced to {*}STRING type{*}, the job may fail 
due to missing type conversion logic in {*}SchemaMergingUtils{*}.

The failure happens in {*}SchemaMergingUtils.coerceObject(){*}. When attempting 
to coerce complex types to STRING during schema merging, the utility only 
handles primitive types and binary data conversions. If the original field is a 
MapData, ArrayData, or RecordData, the coercion logic falls through to the 
default toString() conversion, which either produces incorrect string 
representations or fails entirely because these complex types don't have proper 
toString() implementations for string coercion.

This issue is usually not exposed in simple type scenarios because primitive 
types (INTEGER, VARCHAR, BOOLEAN, TIMESTAMP, etc.) have direct coercion paths 
to STRING through existing conversion rules. The schema merging logic works 
correctly for these basic type conversions, as they can be directly transformed 
without special handling.

However, when CDC pipelines involve schema evolution or field type changes 
where complex types need to be merged into STRING columns, the missing 
conversion logic becomes critical. For example, when merging a table with 
*ARRAY<INT>* column into a target table with *STRING* column at the same 
position, *SchemaMergingUtils* cannot properly serialize the array data 
structure into its string representation, causing the merge operation to fail 
or produce incorrect results.

This issue is not specific to any particular source connector. It can affect 
any CDC pipeline with schema merging scenarios involving complex-to-STRING type 
coercion, regardless of whether the source is MySQL, PostgreSQL, Kafka, or any 
other distributed pipeline source with schema evolution capabilities.

        Summary: Fix kafka sink could not serialize debezium json with column 
default values  (was: Fix unable to coerce complex types (ARRAY/MAP/ROW) to 
STRING)

> Fix kafka sink could not serialize debezium json with column default values
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-39757
>                 URL: https://issues.apache.org/jira/browse/FLINK-39757
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Mao Jiayi
>            Priority: Major
>              Labels: pull-request-available
>
> When using Kafka sink with Debezium JSON format, the serialization may fail 
> if columns have non-string default values defined in the schema.
> The failure happens in {{*DebeziumJsonSerializationSchema*}} when building 
> the Debezium schema. The previous implementation directly sets the default 
> value expression string to the Debezium schema field without type conversion. 
> When *Debezium's {{JsonConverter}}* processes the schema, it validates that 
> the default value matches the field's schema type. If there is a type 
> mismatch, it throws an error and the serialization fails.
> This issue is usually not exposed in simple schemas where all columns use 
> string types or have no default values. In various source connectors, column 
> metadata may include default value expressions as strings. When these schemas 
> are serialized to Debezium JSON format for Kafka sink, the string default 
> values must be converted to their corresponding Java types to match the 
> Debezium schema type requirements. However, the current implementation does 
> not perform this conversion.
> This issue is not specific to any particular source connector. It affects any 
> pipeline that uses *Kafka sink with Debezium JSON format* when the source 
> schema contains columns with non-string default value expressions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to