[
https://issues.apache.org/jira/browse/FLINK-39757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mao Jiayi updated FLINK-39757:
------------------------------
Description:
When using Kafka sink with Debezium JSON format, the serialization may fail if
columns have non-string default values defined in the schema.
The failure happens in {{*DebeziumJsonSerializationSchema*}} when building the
Debezium schema. The previous implementation directly sets the default value
expression string to the Debezium schema field without type conversion. When
*Debezium's {{JsonConverter}}* processes the schema, it validates that the
default value matches the field's schema type. If there is a type mismatch, it
throws an error and the serialization fails.
This issue is usually not exposed in simple schemas where all columns use
string types or have no default values. In various source connectors, column
metadata may include default value expressions as strings. When these schemas
are serialized to Debezium JSON format for Kafka sink, the string default
values must be converted to their corresponding Java types to match the
Debezium schema type requirements. However, the current implementation does not
perform this conversion.
This issue is not specific to any particular source connector. It affects any
pipeline that uses *Kafka sink with Debezium JSON format* when the source
schema contains columns with non-string default value expressions.
was:
When using CDC pipelines with schema merging operations that involve *complex
types (ARRAY, MAP, ROW)* being coerced to {*}STRING type{*}, the job may fail
due to missing type conversion logic in {*}SchemaMergingUtils{*}.
The failure happens in {*}SchemaMergingUtils.coerceObject(){*}. When attempting
to coerce complex types to STRING during schema merging, the utility only
handles primitive types and binary data conversions. If the original field is a
MapData, ArrayData, or RecordData, the coercion logic falls through to the
default toString() conversion, which either produces incorrect string
representations or fails entirely because these complex types don't have proper
toString() implementations for string coercion.
This issue is usually not exposed in simple type scenarios because primitive
types (INTEGER, VARCHAR, BOOLEAN, TIMESTAMP, etc.) have direct coercion paths
to STRING through existing conversion rules. The schema merging logic works
correctly for these basic type conversions, as they can be directly transformed
without special handling.
However, when CDC pipelines involve schema evolution or field type changes
where complex types need to be merged into STRING columns, the missing
conversion logic becomes critical. For example, when merging a table with
*ARRAY<INT>* column into a target table with *STRING* column at the same
position, *SchemaMergingUtils* cannot properly serialize the array data
structure into its string representation, causing the merge operation to fail
or produce incorrect results.
This issue is not specific to any particular source connector. It can affect
any CDC pipeline with schema merging scenarios involving complex-to-STRING type
coercion, regardless of whether the source is MySQL, PostgreSQL, Kafka, or any
other distributed pipeline source with schema evolution capabilities.
Summary: Fix kafka sink could not serialize debezium json with column
default values (was: Fix unable to coerce complex types (ARRAY/MAP/ROW) to
STRING)
> Fix kafka sink could not serialize debezium json with column default values
> ---------------------------------------------------------------------------
>
> Key: FLINK-39757
> URL: https://issues.apache.org/jira/browse/FLINK-39757
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Mao Jiayi
> Priority: Major
> Labels: pull-request-available
>
> When using Kafka sink with Debezium JSON format, the serialization may fail
> if columns have non-string default values defined in the schema.
> The failure happens in {{*DebeziumJsonSerializationSchema*}} when building
> the Debezium schema. The previous implementation directly sets the default
> value expression string to the Debezium schema field without type conversion.
> When *Debezium's {{JsonConverter}}* processes the schema, it validates that
> the default value matches the field's schema type. If there is a type
> mismatch, it throws an error and the serialization fails.
> This issue is usually not exposed in simple schemas where all columns use
> string types or have no default values. In various source connectors, column
> metadata may include default value expressions as strings. When these schemas
> are serialized to Debezium JSON format for Kafka sink, the string default
> values must be converted to their corresponding Java types to match the
> Debezium schema type requirements. However, the current implementation does
> not perform this conversion.
> This issue is not specific to any particular source connector. It affects any
> pipeline that uses *Kafka sink with Debezium JSON format* when the source
> schema contains columns with non-string default value expressions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)