This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c08f75d [hotfix][kafka] Fix Debezium schema generation for complex 
types (ARRAY, MAP, ROW) (#4240)
e0c08f75d is described below

commit e0c08f75dafcadaf0554075380a2015631076f59
Author: SkylerLin <[email protected]>
AuthorDate: Tue Feb 3 11:28:59 2026 +0800

    [hotfix][kafka] Fix Debezium schema generation for complex types (ARRAY, 
MAP, ROW) (#4240)
---
 .../docs/connectors/pipeline-connectors/kafka.md   |  30 ++++++
 .../debezium/DebeziumJsonSerializationSchema.java  |  57 ++++++++---
 .../DebeziumJsonSerializationSchemaTest.java       |  97 +++++++++++++++++++
 .../connectors/kafka/sink/KafkaDataSinkITCase.java | 106 ++++++++++++++++++++-
 4 files changed, 277 insertions(+), 13 deletions(-)

diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index b94efc149..2a5cc4ed5 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -285,36 +285,42 @@ Data Type Mapping
       <td>TINYINT</td>
       <td>INT16</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>SMALLINT</td>
       <td>SMALLINT</td>
       <td>INT16</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>INT</td>
       <td>INT</td>
       <td>INT32</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>BIGINT</td>
       <td>BIGINT</td>
       <td>INT64</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>FLOAT</td>
       <td>FLOAT</td>
       <td>FLOAT</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DECIMAL(p, s)</td>
@@ -328,6 +334,7 @@ Data Type Mapping
       <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DATE</td>
@@ -355,12 +362,35 @@ Data Type Mapping
       <td>CHAR(n)</td>
       <td>STRING</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>VARCHAR(n)</td>
       <td>VARCHAR(n)</td>
       <td>STRING</td>
       <td></td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>ARRAY</td>
+      <td>ARRAY</td>
+      <td></td>
+      <td>Serialized as JSON array. Element types are recursively converted 
according to this table.</td>
+    </tr>
+    <tr>
+      <td>MAP</td>
+      <td>MAP</td>
+      <td>MAP</td>
+      <td></td>
+      <td>Serialized as JSON object. Key and value types are recursively 
converted according to this table.</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>ROW</td>
+      <td>STRUCT</td>
+      <td></td>
+      <td>Serialized as JSON object. Field types are recursively converted 
according to this table.</td>
     </tr>
     </tbody>
 </table>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index b1945b5c1..c70b11744 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -25,7 +25,10 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataField;
 import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.MapType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -249,6 +252,24 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
 
     private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column 
column) {
         org.apache.flink.cdc.common.types.DataType columnType = 
column.getType();
+        SchemaBuilder field = convertCDCDataTypeToDebeziumDataType(columnType);
+
+        if (columnType.isNullable()) {
+            field.optional();
+        } else {
+            field.required();
+        }
+        if (column.getDefaultValueExpression() != null) {
+            field.defaultValue(column.getDefaultValueExpression());
+        }
+        if (column.getComment() != null) {
+            field.doc(column.getComment());
+        }
+        return field;
+    }
+
+    private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
+            org.apache.flink.cdc.common.types.DataType columnType) {
         final SchemaBuilder field;
         switch (columnType.getTypeRoot()) {
             case TINYINT:
@@ -310,23 +331,37 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
                                                         .orElse(0)))
                                 .version(1);
                 break;
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) columnType;
+                org.apache.kafka.connect.data.Schema elementSchema =
+                        
convertCDCDataTypeToDebeziumDataType(arrayType.getElementType()).build();
+                field = SchemaBuilder.array(elementSchema);
+                break;
+            case MAP:
+                MapType mapType = (MapType) columnType;
+                org.apache.kafka.connect.data.Schema keySchema =
+                        
convertCDCDataTypeToDebeziumDataType(mapType.getKeyType()).build();
+                org.apache.kafka.connect.data.Schema valueSchema =
+                        
convertCDCDataTypeToDebeziumDataType(mapType.getValueType()).build();
+                field = SchemaBuilder.map(keySchema, valueSchema);
+                break;
+            case ROW:
+                org.apache.flink.cdc.common.types.RowType rowType =
+                        (org.apache.flink.cdc.common.types.RowType) columnType;
+                SchemaBuilder structBuilder = SchemaBuilder.struct();
+                for (DataField dataField : rowType.getFields()) {
+                    org.apache.kafka.connect.data.Schema fieldSchema =
+                            
convertCDCDataTypeToDebeziumDataType(dataField.getType()).build();
+                    structBuilder.field(dataField.getName(), fieldSchema);
+                }
+                field = structBuilder;
+                break;
             case CHAR:
             case VARCHAR:
             default:
                 field = SchemaBuilder.string();
         }
 
-        if (columnType.isNullable()) {
-            field.optional();
-        } else {
-            field.required();
-        }
-        if (column.getDefaultValueExpression() != null) {
-            field.defaultValue(column.getDefaultValueExpression());
-        }
-        if (column.getComment() != null) {
-            field.doc(column.getComment());
-        }
         return field;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 35aada0c2..c2e026f77 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -340,4 +340,101 @@ class DebeziumJsonSerializationSchemaTest {
         assertThat(rowNode.has("f1")).isTrue();
         assertThat(rowNode.has("f2")).isTrue();
     }
+
+    @Test
+    void testSerializeWithSchemaComplexTypes() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("include-schema.enabled", "true");
+        Configuration configuration = Configuration.fromMap(properties);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        configuration, JsonSerializationType.DEBEZIUM_JSON, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table with complex types
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn(
+                                "row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        DataTypes.ROW(
+                                DataTypes.FIELD("f1", DataTypes.STRING()),
+                                DataTypes.FIELD("f2", DataTypes.INT())));
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+
+        // Create test data with complex types
+        org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+                new org.apache.flink.cdc.common.data.GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("item1"),
+                            BinaryStringData.fromString("item2")
+                        });
+
+        Map<Object, Object> mapValues = new HashMap<>();
+        mapValues.put(BinaryStringData.fromString("key1"), 100);
+        mapValues.put(BinaryStringData.fromString("key2"), 200);
+        org.apache.flink.cdc.common.data.GenericMapData mapData =
+                new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        org.apache.flink.cdc.common.data.RecordData nestedRow =
+                nestedRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("nested"), 
42});
+
+        // insert event with complex types
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(new Object[] {1, arrayData, 
mapData, nestedRow}));
+
+        byte[] serialized = serializationSchema.serialize(insertEvent);
+        JsonNode actual = mapper.readTree(serialized);
+
+        JsonNode expected =
+                mapper.readTree(
+                        "{\"schema\":{\"type\":\"struct\",\"fields\":["
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + 
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+                                + 
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+                                + 
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + 
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+                                + 
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+                                + "],\"optional\":true,\"field\":\"row\"}"
+                                + "],\"optional\":true,\"field\":\"before\"},"
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + 
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+                                + 
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+                                + 
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+                                + "{\"type\":\"struct\",\"fields\":["
+                                + 
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+                                + 
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+                                + "],\"optional\":true,\"field\":\"row\"}"
+                                + "],\"optional\":true,\"field\":\"after\"}"
+                                + "],\"optional\":false},"
+                                + 
"\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
+
+        assertThat(actual).isEqualTo(expected);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index c3bb94fde..8d92b5678 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -85,6 +85,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -710,6 +711,16 @@ class KafkaDataSinkITCase extends TestLogger {
             List<Event> eventsToSerialize,
             List<String> expectedJson)
             throws Exception {
+        runGenericComplexTypeSerializationTest(
+                serializationType, eventsToSerialize, expectedJson, false);
+    }
+
+    void runGenericComplexTypeSerializationTest(
+            JsonSerializationType serializationType,
+            List<Event> eventsToSerialize,
+            List<String> expectedJson,
+            boolean includeSchema)
+            throws Exception {
         try (StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
             env.enableCheckpointing(1000L);
             env.setRestartStrategy(RestartStrategies.noRestart());
@@ -726,6 +737,9 @@ class KafkaDataSinkITCase extends TestLogger {
                         KafkaDataSinkOptions.VALUE_FORMAT.key(),
                         JsonSerializationType.CANAL_JSON.toString());
             }
+            if (includeSchema) {
+                config.put(DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), "true");
+            }
             source.sinkTo(
                     ((FlinkSinkProvider)
                                     (new KafkaDataSinkFactory()
@@ -757,8 +771,9 @@ class KafkaDataSinkITCase extends TestLogger {
                                     }
                                 })
                         .collect(Collectors.toList());
-        assertThat(deserializeValues(collectedRecords))
-                .containsExactlyElementsOf(expectedJsonNodes);
+        List<JsonNode> actualJsonNodes = deserializeValues(collectedRecords);
+
+        
assertThat(actualJsonNodes).containsExactlyElementsOf(expectedJsonNodes);
         checkProducerLeak();
     }
 
@@ -1072,4 +1087,91 @@ class KafkaDataSinkITCase extends TestLogger {
         }
         runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
     }
+
+    @Test
+    void testDebeziumJsonWithSchemaComplexTypes() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn(
+                                "row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) (schema.getColumn("row").get().getType()))
+                                .getFieldTypes()
+                                .toArray(new DataType[0]));
+
+        BinaryRecordData recordData =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericArrayData(
+                                    new Object[] {
+                                        BinaryStringData.fromString("item1"),
+                                        BinaryStringData.fromString("item2")
+                                    }),
+                            new GenericMapData(
+                                    Map.of(
+                                            
BinaryStringData.fromString("key1"), 100,
+                                            
BinaryStringData.fromString("key2"), 200)),
+                            nestedRowGenerator.generate(
+                                    new Object[] 
{BinaryStringData.fromString("nested"), 42})
+                        });
+
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData),
+                        DataChangeEvent.updateEvent(table1, recordData, 
recordData),
+                        DataChangeEvent.deleteEvent(table1, recordData));
+
+        String schemaJson =
+                "{\"type\":\"struct\",\"fields\":["
+                        + "{\"type\":\"struct\",\"fields\":["
+                        + 
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+                        + 
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+                        + 
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+                        + "{\"type\":\"struct\",\"fields\":["
+                        + 
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+                        + 
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+                        + "],\"optional\":true,\"field\":\"row\"}"
+                        + "],\"optional\":true,\"field\":\"before\"},"
+                        + "{\"type\":\"struct\",\"fields\":["
+                        + 
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+                        + 
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+                        + 
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+                        + "{\"type\":\"struct\",\"fields\":["
+                        + 
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+                        + 
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+                        + "],\"optional\":true,\"field\":\"row\"}"
+                        + "],\"optional\":true,\"field\":\"after\"}"
+                        + "],\"optional\":false}";
+
+        List<String> expectedJsonWithSchema =
+                List.of(
+                        "{\"schema\":"
+                                + schemaJson
+                                + 
",\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
+                        "{\"schema\":"
+                                + schemaJson
+                                + 
",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
+                        "{\"schema\":"
+                                + schemaJson
+                                + 
",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}");
+
+        runGenericComplexTypeSerializationTest(
+                JsonSerializationType.DEBEZIUM_JSON,
+                eventsToSerialize,
+                expectedJsonWithSchema,
+                true);
+    }
 }

Reply via email to