This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cfe1d339 [FLINK-38889][pipeline][kafka] Support serializing complex 
types(MAP, ARRAY, ROW) to JSON (Debezium / Canal) (#4221)
0cfe1d339 is described below

commit 0cfe1d339988d3cfa2f55ab08d1766310580c57d
Author: SkylerLin <[email protected]>
AuthorDate: Wed Jan 28 10:44:22 2026 +0800

    [FLINK-38889][pipeline][kafka] Support serializing complex types(MAP, 
ARRAY, ROW) to JSON (Debezium / Canal) (#4221)
    
    Co-authored-by: guoxuanlin <[email protected]>
    Co-authored-by: yuxiqian <[email protected]>
---
 .../cdc/connectors/kafka/json/TableSchemaInfo.java | 100 +----
 .../kafka/json/utils/RecordDataConverter.java      | 414 +++++++++++++++++++
 .../connectors/kafka/json/TableSchemaInfoTest.java |  61 +++
 .../canal/CanalJsonSerializationSchemaTest.java    |  94 +++++
 .../DebeziumJsonSerializationSchemaTest.java       |  93 +++++
 .../kafka/json/utils/RecordDataConverterTest.java  | 445 +++++++++++++++++++++
 .../connectors/kafka/sink/KafkaDataSinkITCase.java | 376 +++++++++++++++++
 7 files changed, 1485 insertions(+), 98 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
index 10be2ff07..3ffe5d683 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
@@ -21,22 +21,15 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeChecks;
-import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.cdc.connectors.kafka.json.utils.RecordDataConverter;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.BinaryStringData;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
-import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
-
 /** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
 public class TableSchemaInfo {
 
@@ -96,96 +89,7 @@ public class TableSchemaInfo {
     }
 
     private static List<RecordData.FieldGetter> createFieldGetters(Schema 
schema, ZoneId zoneId) {
-        List<RecordData.FieldGetter> fieldGetters = new 
ArrayList<>(schema.getColumns().size());
-        for (int i = 0; i < schema.getColumns().size(); i++) {
-            
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, 
zoneId));
-        }
-        return fieldGetters;
-    }
-
-    private static RecordData.FieldGetter createFieldGetter(
-            DataType fieldType, int fieldPos, ZoneId zoneId) {
-        final RecordData.FieldGetter fieldGetter;
-        // ordered by type root definition
-        switch (fieldType.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                fieldGetter =
-                        record ->
-                                
BinaryStringData.fromString(record.getString(fieldPos).toString());
-                break;
-            case BOOLEAN:
-                fieldGetter = record -> record.getBoolean(fieldPos);
-                break;
-            case BINARY:
-            case VARBINARY:
-                fieldGetter = record -> record.getBinary(fieldPos);
-                break;
-            case DECIMAL:
-                final int decimalPrecision = getPrecision(fieldType);
-                final int decimalScale = getScale(fieldType);
-                fieldGetter =
-                        record ->
-                                DecimalData.fromBigDecimal(
-                                        record.getDecimal(fieldPos, 
decimalPrecision, decimalScale)
-                                                .toBigDecimal(),
-                                        decimalPrecision,
-                                        decimalScale);
-                break;
-            case TINYINT:
-                fieldGetter = record -> record.getByte(fieldPos);
-                break;
-            case SMALLINT:
-                fieldGetter = record -> record.getShort(fieldPos);
-                break;
-            case INTEGER:
-                fieldGetter = record -> record.getInt(fieldPos);
-                break;
-            case DATE:
-                fieldGetter = record -> (int) 
record.getDate(fieldPos).toEpochDay();
-                break;
-            case TIME_WITHOUT_TIME_ZONE:
-                fieldGetter = record -> (int) 
record.getTime(fieldPos).toMillisOfDay();
-                break;
-            case BIGINT:
-                fieldGetter = record -> record.getLong(fieldPos);
-                break;
-            case FLOAT:
-                fieldGetter = record -> record.getFloat(fieldPos);
-                break;
-            case DOUBLE:
-                fieldGetter = record -> record.getDouble(fieldPos);
-                break;
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                fieldGetter =
-                        record ->
-                                TimestampData.fromTimestamp(
-                                        record.getTimestamp(fieldPos, 
getPrecision(fieldType))
-                                                .toTimestamp());
-                break;
-            case TIMESTAMP_WITH_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                fieldGetter =
-                        record ->
-                                TimestampData.fromInstant(
-                                        record.getLocalZonedTimestampData(
-                                                        fieldPos,
-                                                        
DataTypeChecks.getPrecision(fieldType))
-                                                .toInstant());
-                break;
-            default:
-                throw new IllegalArgumentException(
-                        "don't support type of " + fieldType.getTypeRoot());
-        }
-        if (!fieldType.isNullable()) {
-            return fieldGetter;
-        }
-        return row -> {
-            if (row.isNullAt(fieldPos)) {
-                return null;
-            }
-            return fieldGetter.getFieldOrNull(row);
-        };
+        return RecordDataConverter.createFieldGetters(schema, zoneId);
     }
 
     public Schema getSchema() {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
new file mode 100644
index 000000000..f865a5ffd
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.utils;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeChecks;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+/** Utility class for converting CDC {@link RecordData} to Flink SQL {@link 
RowData}. */
+public class RecordDataConverter {
+
+    /** Create {@link RecordData.FieldGetter}s from given CDC {@link Schema}. 
*/
+    public static List<RecordData.FieldGetter> createFieldGetters(Schema 
schema, ZoneId zoneId) {
+        List<RecordData.FieldGetter> fieldGetters = new ArrayList<>();
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, 
zoneId));
+        }
+        return fieldGetters;
+    }
+
+    /**
+     * Converts a CDC {@link RecordData} to Flink SQL {@link RowData}.
+     *
+     * @param recordData the CDC record data to convert
+     * @param rowType the row type schema defining the structure
+     * @param zoneId the time zone for timestamp conversions
+     * @return the converted Flink SQL RowData, or null if input is null
+     */
+    private static RowData convertRowData(RecordData recordData, RowType 
rowType, ZoneId zoneId) {
+        if (recordData == null) {
+            return null;
+        }
+
+        List<DataField> fields = rowType.getFields();
+        GenericRowData rowData = new GenericRowData(fields.size());
+
+        for (int i = 0; i < fields.size(); i++) {
+            DataField field = fields.get(i);
+            DataType fieldType = field.getType();
+
+            Object value = convertField(recordData, i, fieldType, zoneId);
+            rowData.setField(i, value);
+        }
+
+        return rowData;
+    }
+
+    /**
+     * Converts a single field from CDC {@link RecordData} to Flink SQL format.
+     *
+     * <p>This method handles various data types including primitives, 
temporal types, and complex
+     * types (arrays, maps, rows).
+     *
+     * @param recordData the CDC record data containing the field
+     * @param pos the position of the field in the record
+     * @param fieldType the data type of the field
+     * @param zoneId the time zone for timestamp conversions
+     * @return the converted field value in Flink SQL format, or null if the 
field is null
+     * @throws IllegalArgumentException if the field type is not supported
+     */
+    private static Object convertField(
+            RecordData recordData, int pos, DataType fieldType, ZoneId zoneId) 
{
+        if (recordData.isNullAt(pos)) {
+            return null;
+        }
+
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return 
BinaryStringData.fromString(recordData.getString(pos).toString());
+            case BOOLEAN:
+                return recordData.getBoolean(pos);
+            case BINARY:
+            case VARBINARY:
+                return recordData.getBinary(pos);
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                return DecimalData.fromBigDecimal(
+                        recordData.getDecimal(pos, decimalPrecision, 
decimalScale).toBigDecimal(),
+                        decimalPrecision,
+                        decimalScale);
+            case TINYINT:
+                return recordData.getByte(pos);
+            case SMALLINT:
+                return recordData.getShort(pos);
+            case INTEGER:
+                return recordData.getInt(pos);
+            case DATE:
+                return recordData.getDate(pos).toEpochDay();
+            case TIME_WITHOUT_TIME_ZONE:
+                return recordData.getTime(pos).toMillisOfDay();
+            case BIGINT:
+                return recordData.getLong(pos);
+            case FLOAT:
+                return recordData.getFloat(pos);
+            case DOUBLE:
+                return recordData.getDouble(pos);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TimestampData.fromTimestamp(
+                        recordData.getTimestamp(pos, 
getPrecision(fieldType)).toTimestamp());
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampData.fromInstant(
+                        recordData
+                                .getLocalZonedTimestampData(
+                                        pos, 
DataTypeChecks.getPrecision(fieldType))
+                                .toInstant());
+            case ARRAY:
+                ArrayData arrayData = recordData.getArray(pos);
+                return convertArrayData(arrayData, (ArrayType) fieldType, 
zoneId);
+            case MAP:
+                MapData mapData = recordData.getMap(pos);
+                return convertMapData(mapData, (MapType) fieldType, zoneId);
+            case ROW:
+                RecordData nestedRecordData =
+                        recordData.getRow(pos, 
DataTypeChecks.getFieldCount(fieldType));
+                return convertRowData(nestedRecordData, (RowType) fieldType, 
zoneId);
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported field type: " + fieldType.getTypeRoot());
+        }
+    }
+
+    /**
+     * Converts a CDC {@link ArrayData} to Flink SQL {@link 
org.apache.flink.table.data.ArrayData}.
+     *
+     * <p>This method recursively converts each element in the array according 
to the element type.
+     *
+     * @param arrayData the CDC array data to convert
+     * @param arrayType the array type schema defining the element type
+     * @param zoneId the time zone for timestamp conversions
+     * @return the converted Flink SQL ArrayData, or null if input is null
+     */
+    private static org.apache.flink.table.data.ArrayData convertArrayData(
+            ArrayData arrayData, ArrayType arrayType, ZoneId zoneId) {
+        if (arrayData == null) {
+            return null;
+        }
+
+        DataType elementType = arrayType.getElementType();
+        int size = arrayData.size();
+        Object[] result = new Object[size];
+
+        for (int i = 0; i < size; i++) {
+            result[i] = convertElement(arrayData, i, elementType, zoneId);
+        }
+
+        return new org.apache.flink.table.data.GenericArrayData(result);
+    }
+
+    /**
+     * Converts a CDC {@link MapData} to Flink SQL {@link 
org.apache.flink.table.data.MapData}.
+     *
+     * <p>This method converts both keys and values in the map according to 
their respective types.
+     *
+     * @param mapData the CDC map data to convert
+     * @param mapType the map type schema defining key and value types
+     * @param zoneId the time zone for timestamp conversions
+     * @return the converted Flink SQL MapData, or null if input is null
+     */
+    private static org.apache.flink.table.data.MapData convertMapData(
+            MapData mapData, MapType mapType, ZoneId zoneId) {
+        if (mapData == null) {
+            return null;
+        }
+
+        ArrayData keyArray = mapData.keyArray();
+        ArrayData valueArray = mapData.valueArray();
+
+        int size = keyArray.size();
+        Map<Object, Object> result = new HashMap<>();
+
+        DataType keyType = mapType.getKeyType();
+        DataType valueType = mapType.getValueType();
+
+        for (int i = 0; i < size; i++) {
+            Object key = convertElement(keyArray, i, keyType, zoneId);
+            Object value = convertElement(valueArray, i, valueType, zoneId);
+            result.put(key, value);
+        }
+
+        return new org.apache.flink.table.data.GenericMapData(result);
+    }
+
+    /**
+     * Converts a single element from CDC {@link ArrayData} to Flink SQL 
format.
+     *
+     * <p>This method is similar to {@link #convertField} but operates on 
array elements.
+     *
+     * @param arrayData the CDC array data containing the element
+     * @param pos the position of the element in the array
+     * @param elementType the data type of the element
+     * @param zoneId the time zone for timestamp conversions
+     * @return the converted element value in Flink SQL format, or null if the 
element is null
+     * @throws IllegalArgumentException if the element type is not supported
+     */
+    private static Object convertElement(
+            ArrayData arrayData, int pos, DataType elementType, ZoneId zoneId) 
{
+        if (arrayData.isNullAt(pos)) {
+            return null;
+        }
+
+        switch (elementType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return 
BinaryStringData.fromString(arrayData.getString(pos).toString());
+            case BOOLEAN:
+                return arrayData.getBoolean(pos);
+            case BINARY:
+            case VARBINARY:
+                return arrayData.getBinary(pos);
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(elementType);
+                final int decimalScale = getScale(elementType);
+                return DecimalData.fromBigDecimal(
+                        arrayData
+                                .getDecimal(pos, getPrecision(elementType), 
getScale(elementType))
+                                .toBigDecimal(),
+                        decimalPrecision,
+                        decimalScale);
+            case TINYINT:
+                return arrayData.getByte(pos);
+            case SMALLINT:
+                return arrayData.getShort(pos);
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return arrayData.getInt(pos);
+            case BIGINT:
+                return arrayData.getLong(pos);
+            case FLOAT:
+                return arrayData.getFloat(pos);
+            case DOUBLE:
+                return arrayData.getDouble(pos);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TimestampData.fromTimestamp(
+                        arrayData.getTimestamp(pos, 
getPrecision(elementType)).toTimestamp());
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampData.fromInstant(
+                        arrayData
+                                .getLocalZonedTimestamp(pos, 
getPrecision(elementType))
+                                .toInstant());
+            case ARRAY:
+                ArrayData nestedArrayData = arrayData.getArray(pos);
+                return convertArrayData(nestedArrayData, (ArrayType) 
elementType, zoneId);
+            case MAP:
+                MapData mapData = arrayData.getMap(pos);
+                return convertMapData(mapData, (MapType) elementType, zoneId);
+            case ROW:
+                RecordData recordData =
+                        arrayData.getRecord(pos, 
DataTypeChecks.getFieldCount(elementType));
+                return convertRowData(recordData, (RowType) elementType, 
zoneId);
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported element type: " + 
elementType.getTypeRoot());
+        }
+    }
+
+    /**
+     * Creates a {@link RecordData.FieldGetter} for extracting and converting 
a specific field.
+     *
+     * <p>This method generates a lambda function that extracts a field at the 
given position and
+     * converts it to Flink SQL format.
+     *
+     * @param fieldType the data type of the field
+     * @param fieldPos the position of the field in the record
+     * @param zoneId the time zone for timestamp conversions
+     * @return a FieldGetter that extracts and converts the field value
+     * @throws IllegalArgumentException if the field type is not supported
+     */
+    private static RecordData.FieldGetter createFieldGetter(
+            DataType fieldType, int fieldPos, ZoneId zoneId) {
+        final RecordData.FieldGetter fieldGetter;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldGetter =
+                        record ->
+                                
BinaryStringData.fromString(record.getString(fieldPos).toString());
+                break;
+            case BOOLEAN:
+                fieldGetter = record -> record.getBoolean(fieldPos);
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldGetter = record -> record.getBinary(fieldPos);
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldGetter =
+                        record ->
+                                DecimalData.fromBigDecimal(
+                                        record.getDecimal(fieldPos, 
decimalPrecision, decimalScale)
+                                                .toBigDecimal(),
+                                        decimalPrecision,
+                                        decimalScale);
+                break;
+            case TINYINT:
+                fieldGetter = record -> record.getByte(fieldPos);
+                break;
+            case SMALLINT:
+                fieldGetter = record -> record.getShort(fieldPos);
+                break;
+            case INTEGER:
+                fieldGetter = record -> record.getInt(fieldPos);
+                break;
+            case DATE:
+                fieldGetter = record -> (int) 
record.getDate(fieldPos).toEpochDay();
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                fieldGetter = record -> (int) 
record.getTime(fieldPos).toMillisOfDay();
+                break;
+            case BIGINT:
+                fieldGetter = record -> record.getLong(fieldPos);
+                break;
+            case FLOAT:
+                fieldGetter = record -> record.getFloat(fieldPos);
+                break;
+            case DOUBLE:
+                fieldGetter = record -> record.getDouble(fieldPos);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                fieldGetter =
+                        record ->
+                                TimestampData.fromTimestamp(
+                                        record.getTimestamp(fieldPos, 
getPrecision(fieldType))
+                                                .toTimestamp());
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                fieldGetter =
+                        record ->
+                                TimestampData.fromInstant(
+                                        record.getLocalZonedTimestampData(
+                                                        fieldPos,
+                                                        
DataTypeChecks.getPrecision(fieldType))
+                                                .toInstant());
+                break;
+            case ARRAY:
+                fieldGetter =
+                        record ->
+                                convertArrayData(
+                                        record.getArray(fieldPos), (ArrayType) 
fieldType, zoneId);
+                break;
+            case MAP:
+                fieldGetter =
+                        record ->
+                                convertMapData(
+                                        record.getMap(fieldPos), (MapType) 
fieldType, zoneId);
+                break;
+            case ROW:
+                fieldGetter =
+                        record ->
+                                convertRowData(
+                                        record.getRow(
+                                                fieldPos, ((RowType) 
fieldType).getFieldCount()),
+                                        (RowType) fieldType,
+                                        zoneId);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported field type: " + fieldType.getTypeRoot());
+        }
+        if (!fieldType.isNullable()) {
+            return fieldGetter;
+        }
+        return row -> {
+            if (row.isNullAt(fieldPos)) {
+                return null;
+            }
+            return fieldGetter.getFieldOrNull(row);
+        };
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
index ffae90442..3bafe8eec 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
@@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.kafka.json;
 
 import org.apache.flink.cdc.common.data.DateData;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -27,6 +29,7 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.types.RowKind;
@@ -169,4 +172,62 @@ class TableSchemaInfoTest {
                                         
Instant.parse("2023-01-01T00:00:00.000Z")),
                                 null));
     }
+
+    @Test
+    void testArrayWithNestedRowType() {
+        // Create a schema with ARRAY<ROW<name STRING, age INT>>
+        DataType personRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("age", DataTypes.INT()));
+
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("persons", 
DataTypes.ARRAY(personRowType))
+                        .primaryKey("id")
+                        .build();
+
+        TableSchemaInfo tableSchemaInfo =
+                new TableSchemaInfo(
+                        TableId.parse("testDatabase.testTable"), schema, null, 
ZoneId.of("UTC"));
+
+        // Create test data with nested ROW in ARRAY
+        BinaryRecordDataGenerator personGenerator =
+                new BinaryRecordDataGenerator(new DataType[] 
{DataTypes.STRING(), DataTypes.INT()});
+
+        RecordData person1 =
+                personGenerator.generate(new Object[] 
{BinaryStringData.fromString("Alice"), 30});
+        RecordData person2 =
+                personGenerator.generate(new Object[] 
{BinaryStringData.fromString("Bob"), 25});
+
+        GenericArrayData personsArray = new GenericArrayData(new RecordData[] 
{person1, person2});
+
+        Object[] testData = new Object[] {1, personsArray};
+
+        BinaryRecordData recordData =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+
+        // Convert and verify
+        org.apache.flink.table.data.RowData result =
+                tableSchemaInfo.getRowDataFromRecordData(recordData, false);
+
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getInt(0)).isEqualTo(1);
+
+        org.apache.flink.table.data.ArrayData resultArray = result.getArray(1);
+        Assertions.assertThat(resultArray).isNotNull();
+        Assertions.assertThat(resultArray.size()).isEqualTo(2);
+
+        // Verify first person
+        org.apache.flink.table.data.RowData resultPerson1 = 
resultArray.getRow(0, 2);
+        
Assertions.assertThat(resultPerson1.getString(0).toString()).isEqualTo("Alice");
+        Assertions.assertThat(resultPerson1.getInt(1)).isEqualTo(30);
+
+        // Verify second person
+        org.apache.flink.table.data.RowData resultPerson2 = 
resultArray.getRow(1, 2);
+        
Assertions.assertThat(resultPerson2.getString(0).toString()).isEqualTo("Bob");
+        Assertions.assertThat(resultPerson2.getInt(1)).isEqualTo(25);
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
index 9543c90d1..dc2b71d43 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
@@ -134,4 +134,98 @@ class CanalJsonSerializationSchemaTest {
         actual = mapper.readTree(serializationSchema.serialize(updateEvent));
         Assertions.assertThat(actual).isEqualTo(expected);
     }
+
+    @Test
+    void testSerializeComplexTypes() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        new Configuration(),
+                        JsonSerializationType.CANAL_JSON,
+                        ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table with complex types
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn(
+                                "row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        DataTypes.ROW(
+                                DataTypes.FIELD("f1", DataTypes.STRING()),
+                                DataTypes.FIELD("f2", DataTypes.INT())));
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+
+        // Create test data with complex types
+        org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+                new org.apache.flink.cdc.common.data.GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("item1"),
+                            BinaryStringData.fromString("item2")
+                        });
+
+        java.util.Map<Object, Object> mapValues = new java.util.HashMap<>();
+        mapValues.put(BinaryStringData.fromString("key1"), 100);
+        mapValues.put(BinaryStringData.fromString("key2"), 200);
+        org.apache.flink.cdc.common.data.GenericMapData mapData =
+                new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        org.apache.flink.cdc.common.data.RecordData nestedRow =
+                nestedRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("nested"), 
42});
+
+        // insert event with complex types
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(new Object[] {1, arrayData, 
mapData, nestedRow}));
+
+        byte[] serialized = serializationSchema.serialize(insertEvent);
+        JsonNode actual = mapper.readTree(serialized);
+
+        // Verify the structure contains complex types
+        Assertions.assertThat(actual.has("old")).isTrue();
+        Assertions.assertThat(actual.has("data")).isTrue();
+        Assertions.assertThat(actual.get("data").isArray()).isTrue();
+        Assertions.assertThat(actual.get("data").get(0).has("id")).isTrue();
+        Assertions.assertThat(actual.get("data").get(0).has("arr")).isTrue();
+        Assertions.assertThat(actual.get("data").get(0).has("map")).isTrue();
+        Assertions.assertThat(actual.get("data").get(0).has("row")).isTrue();
+
+        // Verify array content
+        JsonNode arrNode = actual.get("data").get(0).get("arr");
+        Assertions.assertThat(arrNode.isArray()).isTrue();
+        Assertions.assertThat(arrNode.size()).isEqualTo(2);
+
+        // Verify map content
+        JsonNode mapNode = actual.get("data").get(0).get("map");
+        Assertions.assertThat(mapNode.isObject()).isTrue();
+
+        // Verify row content
+        JsonNode rowNode = actual.get("data").get(0).get("row");
+        Assertions.assertThat(rowNode.isObject()).isTrue();
+        Assertions.assertThat(rowNode.has("f1")).isTrue();
+        Assertions.assertThat(rowNode.has("f2")).isTrue();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index f63677c6c..35aada0c2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -247,4 +247,97 @@ class DebeziumJsonSerializationSchemaTest {
         JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
         assertThat(actual).isEqualTo(expected);
     }
+
+    @Test
+    void testSerializeComplexTypes() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        new Configuration(),
+                        JsonSerializationType.DEBEZIUM_JSON,
+                        ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table with complex types
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn(
+                                "row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        DataTypes.ROW(
+                                DataTypes.FIELD("f1", DataTypes.STRING()),
+                                DataTypes.FIELD("f2", DataTypes.INT())));
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+
+        // Create test data with complex types
+        org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+                new org.apache.flink.cdc.common.data.GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("item1"),
+                            BinaryStringData.fromString("item2")
+                        });
+
+        Map<Object, Object> mapValues = new HashMap<>();
+        mapValues.put(BinaryStringData.fromString("key1"), 100);
+        mapValues.put(BinaryStringData.fromString("key2"), 200);
+        org.apache.flink.cdc.common.data.GenericMapData mapData =
+                new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        org.apache.flink.cdc.common.data.RecordData nestedRow =
+                nestedRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("nested"), 
42});
+
+        // insert event with complex types
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(new Object[] {1, arrayData, 
mapData, nestedRow}));
+
+        byte[] serialized = serializationSchema.serialize(insertEvent);
+        JsonNode actual = mapper.readTree(serialized);
+
+        // Verify the structure contains complex types
+        assertThat(actual.has("before")).isTrue();
+        assertThat(actual.has("after")).isTrue();
+        assertThat(actual.get("after").has("id")).isTrue();
+        assertThat(actual.get("after").has("arr")).isTrue();
+        assertThat(actual.get("after").has("map")).isTrue();
+        assertThat(actual.get("after").has("row")).isTrue();
+
+        // Verify array content
+        JsonNode arrNode = actual.get("after").get("arr");
+        assertThat(arrNode.isArray()).isTrue();
+        assertThat(arrNode.size()).isEqualTo(2);
+
+        // Verify map content
+        JsonNode mapNode = actual.get("after").get("map");
+        assertThat(mapNode.isObject()).isTrue();
+
+        // Verify row content
+        JsonNode rowNode = actual.get("after").get("row");
+        assertThat(rowNode.isObject()).isTrue();
+        assertThat(rowNode.has("f1")).isTrue();
+        assertThat(rowNode.has("f2")).isTrue();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
new file mode 100644
index 000000000..1013d210e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.utils;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Unit tests for {@link RecordDataConverter}. */
+class RecordDataConverterTest {
+
+    private static final ZoneId TEST_ZONE_ID = ZoneId.of("UTC");
+
+    /** Tests RecordDataConverter.createFieldGetters method for all supported 
data types. */
+    @Test
+    void testCreateFieldGettersForAllTypes() {
+        // Define nested ROW types for complex type testing
+        DataType innerRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("inner_name", DataTypes.STRING()),
+                        DataTypes.FIELD("inner_value", DataTypes.INT()));
+
+        DataType outerRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("row_id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("row_inner", innerRowType));
+
+        Schema schema =
+                Schema.newBuilder()
+                        // Basic types
+                        .physicalColumn("col_boolean", DataTypes.BOOLEAN())
+                        .physicalColumn("col_tinyint", DataTypes.TINYINT())
+                        .physicalColumn("col_smallint", DataTypes.SMALLINT())
+                        .physicalColumn("col_int", DataTypes.INT())
+                        .physicalColumn("col_bigint", DataTypes.BIGINT())
+                        .physicalColumn("col_float", DataTypes.FLOAT())
+                        .physicalColumn("col_double", DataTypes.DOUBLE())
+                        // String types
+                        .physicalColumn("col_char", DataTypes.CHAR(10))
+                        .physicalColumn("col_varchar", DataTypes.VARCHAR(50))
+                        .physicalColumn("col_string", DataTypes.STRING())
+                        // Binary types
+                        .physicalColumn("col_binary", DataTypes.BINARY(5))
+                        .physicalColumn("col_varbinary", 
DataTypes.VARBINARY(20))
+                        .physicalColumn("col_bytes", DataTypes.BYTES())
+                        // Decimal types
+                        .physicalColumn("col_decimal_small", 
DataTypes.DECIMAL(6, 3))
+                        .physicalColumn("col_decimal_large", 
DataTypes.DECIMAL(38, 18))
+                        // Temporal types
+                        .physicalColumn("col_date", DataTypes.DATE())
+                        .physicalColumn("col_time", DataTypes.TIME())
+                        .physicalColumn("col_time_precision", 
DataTypes.TIME(6))
+                        .physicalColumn("col_timestamp", DataTypes.TIMESTAMP())
+                        .physicalColumn("col_timestamp_ltz", 
DataTypes.TIMESTAMP_LTZ())
+                        // Array types
+                        .physicalColumn("col_array_int", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .physicalColumn("col_array_string", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("col_array_row", 
DataTypes.ARRAY(innerRowType))
+                        // Map types
+                        .physicalColumn(
+                                "col_map_string_int",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .physicalColumn(
+                                "col_map_int_row", 
DataTypes.MAP(DataTypes.INT(), innerRowType))
+                        // Row types
+                        .physicalColumn("col_row", innerRowType)
+                        .physicalColumn("col_nested_row", outerRowType)
+                        // NOT NULL type
+                        .physicalColumn("col_int_not_null", 
DataTypes.INT().notNull())
+                        .primaryKey("col_int")
+                        .build();
+
+        List<RecordData.FieldGetter> fieldGetters =
+                RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+        Assertions.assertThat(fieldGetters).hasSize(28);
+
+        // Prepare test data - create Row data generators
+        BinaryRecordDataGenerator innerRowGenerator =
+                new BinaryRecordDataGenerator(new DataType[] 
{DataTypes.STRING(), DataTypes.INT()});
+        BinaryRecordDataGenerator outerRowGenerator =
+                new BinaryRecordDataGenerator(new DataType[] 
{DataTypes.BIGINT(), innerRowType});
+        RecordData innerRow1 =
+                innerRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("inner1"), 
100});
+        RecordData innerRow2 =
+                innerRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("inner2"), 
200});
+        RecordData innerRow3 =
+                innerRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("inner3"), 
300});
+
+        RecordData outerRow = outerRowGenerator.generate(new Object[] {999L, 
innerRow1});
+        GenericArrayData intArray = new GenericArrayData(new Integer[] {1, 2, 
3, 4, 5});
+        GenericArrayData stringArray =
+                new GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("a"),
+                            BinaryStringData.fromString("b"),
+                            BinaryStringData.fromString("c")
+                        });
+        GenericArrayData rowArray = new GenericArrayData(new RecordData[] 
{innerRow2, innerRow3});
+
+        // Create Map data
+        Map<Object, Object> stringIntMap = new HashMap<>();
+        stringIntMap.put(BinaryStringData.fromString("key1"), 10);
+        stringIntMap.put(BinaryStringData.fromString("key2"), 20);
+        GenericMapData mapStringInt = new GenericMapData(stringIntMap);
+
+        Map<Object, Object> intRowMap = new HashMap<>();
+        intRowMap.put(1, innerRow1);
+        intRowMap.put(2, innerRow2);
+        GenericMapData mapIntRow = new GenericMapData(intRowMap);
+
+        Instant testInstant = Instant.parse("2023-06-15T10:30:00.123456Z");
+        Object[] testData =
+                new Object[] {
+                    true,
+                    (byte) 127,
+                    (short) 32767,
+                    2147483647,
+                    9223372036854775807L,
+                    3.14f,
+                    2.718281828,
+                    BinaryStringData.fromString("char_val"),
+                    BinaryStringData.fromString("varchar_value"),
+                    BinaryStringData.fromString("string_value"),
+                    new byte[] {1, 2, 3, 4, 5},
+                    new byte[] {6, 7, 8, 9, 10, 11, 12},
+                    new byte[] {13, 14, 15},
+                    DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 
3),
+                    DecimalData.fromBigDecimal(
+                            new 
BigDecimal("12345678901234567890.123456789012345678"), 38, 18),
+                    DateData.fromEpochDay(19523),
+                    TimeData.fromMillisOfDay(37800000),
+                    TimeData.fromNanoOfDay(37800123456000L),
+                    TimestampData.fromTimestamp(
+                            java.sql.Timestamp.valueOf("2023-06-15 
10:30:00.123456")),
+                    LocalZonedTimestampData.fromInstant(testInstant),
+                    intArray,
+                    stringArray,
+                    rowArray,
+                    mapStringInt,
+                    mapIntRow,
+                    innerRow1,
+                    outerRow,
+                    42
+                };
+
+        BinaryRecordData recordData =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+
+        int idx = 0;
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(true);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo((byte) 127);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo((short) 32767);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(2147483647);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(9223372036854775807L);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(3.14f);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(2.718281828);
+
+        // Verify string types
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+                .isEqualTo("char_val");
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+                .isEqualTo("varchar_value");
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+                .isEqualTo("string_value");
+
+        // Verify binary types
+        Assertions.assertThat((byte[]) 
fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(new byte[] {1, 2, 3, 4, 5});
+        Assertions.assertThat((byte[]) 
fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(new byte[] {6, 7, 8, 9, 10, 11, 12});
+        Assertions.assertThat((byte[]) 
fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(new byte[] {13, 14, 15});
+
+        // Verify decimal types
+        org.apache.flink.table.data.DecimalData decimalSmall =
+                (org.apache.flink.table.data.DecimalData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(decimalSmall.toBigDecimal())
+                .isEqualByComparingTo(new BigDecimal("123.456"));
+
+        org.apache.flink.table.data.DecimalData decimalLarge =
+                (org.apache.flink.table.data.DecimalData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(decimalLarge.toBigDecimal())
+                .isEqualByComparingTo(new 
BigDecimal("12345678901234567890.123456789012345678"));
+
+        // Verify temporal types
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(19523);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(37800000);
+        
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+                .isEqualTo(37800123);
+
+        org.apache.flink.table.data.TimestampData timestamp =
+                (org.apache.flink.table.data.TimestampData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(timestamp.toTimestamp())
+                .isEqualTo(java.sql.Timestamp.valueOf("2023-06-15 
10:30:00.123456"));
+
+        org.apache.flink.table.data.TimestampData timestampLtz =
+                (org.apache.flink.table.data.TimestampData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(timestampLtz.toInstant()).isEqualTo(testInstant);
+
+        // Verify array types
+        org.apache.flink.table.data.ArrayData resultIntArray =
+                (org.apache.flink.table.data.ArrayData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultIntArray.size()).isEqualTo(5);
+        Assertions.assertThat(resultIntArray.getInt(0)).isEqualTo(1);
+        Assertions.assertThat(resultIntArray.getInt(4)).isEqualTo(5);
+
+        org.apache.flink.table.data.ArrayData resultStringArray =
+                (org.apache.flink.table.data.ArrayData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultStringArray.size()).isEqualTo(3);
+        
Assertions.assertThat(resultStringArray.getString(0).toString()).isEqualTo("a");
+        
Assertions.assertThat(resultStringArray.getString(2).toString()).isEqualTo("c");
+
+        org.apache.flink.table.data.ArrayData resultRowArray =
+                (org.apache.flink.table.data.ArrayData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultRowArray.size()).isEqualTo(2);
+        org.apache.flink.table.data.RowData arrayRow0 = 
resultRowArray.getRow(0, 2);
+        
Assertions.assertThat(arrayRow0.getString(0).toString()).isEqualTo("inner2");
+        Assertions.assertThat(arrayRow0.getInt(1)).isEqualTo(200);
+
+        // Verify map types
+        org.apache.flink.table.data.MapData resultMapStringInt =
+                (org.apache.flink.table.data.MapData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultMapStringInt).isNotNull();
+        
Assertions.assertThat(resultMapStringInt.keyArray().size()).isEqualTo(2);
+
+        org.apache.flink.table.data.MapData resultMapIntRow =
+                (org.apache.flink.table.data.MapData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultMapIntRow).isNotNull();
+        Assertions.assertThat(resultMapIntRow.keyArray().size()).isEqualTo(2);
+
+        // Verify row types
+        org.apache.flink.table.data.RowData resultRow =
+                (org.apache.flink.table.data.RowData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultRow).isNotNull();
+        
Assertions.assertThat(resultRow.getString(0).toString()).isEqualTo("inner1");
+        Assertions.assertThat(resultRow.getInt(1)).isEqualTo(100);
+
+        org.apache.flink.table.data.RowData resultNestedRow =
+                (org.apache.flink.table.data.RowData)
+                        fieldGetters.get(idx++).getFieldOrNull(recordData);
+        Assertions.assertThat(resultNestedRow).isNotNull();
+        Assertions.assertThat(resultNestedRow.getLong(0)).isEqualTo(999L);
+        org.apache.flink.table.data.RowData nestedInnerRow = 
resultNestedRow.getRow(1, 2);
+        
Assertions.assertThat(nestedInnerRow.getString(0).toString()).isEqualTo("inner1");
+        Assertions.assertThat(nestedInnerRow.getInt(1)).isEqualTo(100);
+
+        // Verify NOT NULL type
+        
Assertions.assertThat(fieldGetters.get(idx).getFieldOrNull(recordData)).isEqualTo(42);
+    }
+
+    /** Tests the case when all field values are null. */
+    @Test
+    void testCreateFieldGettersWithAllNullValues() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("string_col", DataTypes.STRING())
+                        .physicalColumn("int_col", DataTypes.INT())
+                        .physicalColumn("decimal_col", DataTypes.DECIMAL(10, 
2))
+                        .physicalColumn("timestamp_col", DataTypes.TIMESTAMP())
+                        .physicalColumn("array_col", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .physicalColumn(
+                                "map_col", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .physicalColumn(
+                                "row_col", DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.STRING())))
+                        .build();
+
+        List<RecordData.FieldGetter> fieldGetters =
+                RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+        Object[] testData = new Object[] {null, null, null, null, null, null, 
null};
+
+        BinaryRecordData recordData =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+
+        Assertions.assertThat(fieldGetters).hasSize(7);
+        for (int i = 0; i < fieldGetters.size(); i++) {
+            
Assertions.assertThat(fieldGetters.get(i).getFieldOrNull(recordData)).isNull();
+        }
+    }
+
+    /** Tests the case with mixed null and non-null values. */
+    @Test
+    void testCreateFieldGettersWithMixedNullValues() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("string_col", DataTypes.STRING())
+                        .physicalColumn("int_col", DataTypes.INT())
+                        .physicalColumn("nullable_string", DataTypes.STRING())
+                        .physicalColumn("bigint_col", DataTypes.BIGINT())
+                        .physicalColumn("nullable_array", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .build();
+
+        List<RecordData.FieldGetter> fieldGetters =
+                RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+        Object[] testData =
+                new Object[] {BinaryStringData.fromString("test"), 42, null, 
100L, null};
+
+        BinaryRecordData recordData =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+
+        
Assertions.assertThat(fieldGetters.get(0).getFieldOrNull(recordData).toString())
+                .isEqualTo("test");
+        
Assertions.assertThat(fieldGetters.get(1).getFieldOrNull(recordData)).isEqualTo(42);
+        
Assertions.assertThat(fieldGetters.get(2).getFieldOrNull(recordData)).isNull();
+        
Assertions.assertThat(fieldGetters.get(3).getFieldOrNull(recordData)).isEqualTo(100L);
+        
Assertions.assertThat(fieldGetters.get(4).getFieldOrNull(recordData)).isNull();
+    }
+
+    /** Tests the case with an empty Schema. */
+    @Test
+    void testCreateFieldGettersForEmptySchema() {
+        Schema schema = Schema.newBuilder().build();
+
+        List<RecordData.FieldGetter> fieldGetters =
+                RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+        Assertions.assertThat(fieldGetters).isEmpty();
+    }
+
+    /** Tests the impact of different time zones on TIMESTAMP_LTZ type. */
+    @Test
+    void testCreateFieldGettersWithDifferentTimeZones() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("timestamp_ltz", 
DataTypes.TIMESTAMP_LTZ())
+                        .build();
+
+        Instant testInstant = Instant.parse("2023-01-01T12:00:00.000Z");
+        Object[] testData = new Object[] 
{LocalZonedTimestampData.fromInstant(testInstant)};
+
+        List<RecordData.FieldGetter> utcFieldGetters =
+                RecordDataConverter.createFieldGetters(schema, 
ZoneId.of("UTC"));
+        BinaryRecordData recordData1 =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+        org.apache.flink.table.data.TimestampData utcResult =
+                (org.apache.flink.table.data.TimestampData)
+                        utcFieldGetters.get(0).getFieldOrNull(recordData1);
+        Assertions.assertThat(utcResult.toInstant()).isEqualTo(testInstant);
+
+        // Test with Asia/Shanghai timezone
+        List<RecordData.FieldGetter> shanghaiFieldGetters =
+                RecordDataConverter.createFieldGetters(schema, 
ZoneId.of("Asia/Shanghai"));
+        BinaryRecordData recordData2 =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+        org.apache.flink.table.data.TimestampData shanghaiResult =
+                (org.apache.flink.table.data.TimestampData)
+                        
shanghaiFieldGetters.get(0).getFieldOrNull(recordData2);
+        
Assertions.assertThat(shanghaiResult.toInstant()).isEqualTo(testInstant);
+    }
+
+    /** Tests edge cases with empty Array and empty Map. */
+    @Test
+    void testCreateFieldGettersForEmptyCollections() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("empty_array", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .physicalColumn(
+                                "empty_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .build();
+
+        List<RecordData.FieldGetter> fieldGetters =
+                RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+        GenericArrayData emptyArray = new GenericArrayData(new Integer[] {});
+        GenericMapData emptyMap = new GenericMapData(new HashMap<>());
+
+        Object[] testData = new Object[] {emptyArray, emptyMap};
+
+        BinaryRecordData recordData =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+                        .generate(testData);
+
+        org.apache.flink.table.data.ArrayData resultArray =
+                (org.apache.flink.table.data.ArrayData)
+                        fieldGetters.get(0).getFieldOrNull(recordData);
+        Assertions.assertThat(resultArray.size()).isEqualTo(0);
+
+        org.apache.flink.table.data.MapData resultMap =
+                (org.apache.flink.table.data.MapData)
+                        fieldGetters.get(1).getFieldOrNull(recordData);
+        Assertions.assertThat(resultMap.keyArray().size()).isEqualTo(0);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 8ccf85328..c3bb94fde 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -20,6 +20,10 @@ package org.apache.flink.cdc.connectors.kafka.sink;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -32,6 +36,7 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
@@ -44,6 +49,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -58,6 +64,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
@@ -696,4 +704,372 @@ class KafkaDataSinkITCase extends TestLogger {
         return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
                 && 
threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
     }
+
+    void runGenericComplexTypeSerializationTest(
+            JsonSerializationType serializationType,
+            List<Event> eventsToSerialize,
+            List<String> expectedJson)
+            throws Exception {
+        try (StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+            env.enableCheckpointing(1000L);
+            env.setRestartStrategy(RestartStrategies.noRestart());
+            final DataStream<Event> source = env.fromData(eventsToSerialize, 
new EventTypeInfo());
+            Map<String, String> config = new HashMap<>();
+            Properties properties = getKafkaClientConfiguration();
+            properties.forEach(
+                    (key, value) ->
+                            config.put(
+                                    KafkaDataSinkOptions.PROPERTIES_PREFIX + 
key.toString(),
+                                    value.toString()));
+            if (serializationType == JsonSerializationType.CANAL_JSON) {
+                config.put(
+                        KafkaDataSinkOptions.VALUE_FORMAT.key(),
+                        JsonSerializationType.CANAL_JSON.toString());
+            }
+            source.sinkTo(
+                    ((FlinkSinkProvider)
+                                    (new KafkaDataSinkFactory()
+                                            .createDataSink(
+                                                    new 
FactoryHelper.DefaultContext(
+                                                            
Configuration.fromMap(config),
+                                                            
Configuration.fromMap(new HashMap<>()),
+                                                            
this.getClass().getClassLoader()))
+                                            .getEventSinkProvider()))
+                            .getSink());
+            env.execute();
+        }
+
+        final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
+                drainAllRecordsFromTopic(topic, false, 0);
+        assertThat(collectedRecords).hasSameSizeAs(expectedJson);
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        List<JsonNode> expectedJsonNodes =
+                expectedJson.stream()
+                        .map(
+                                s -> {
+                                    try {
+                                        return mapper.readTree(
+                                                String.format(s, 
table1.getTableName()));
+                                    } catch (JsonProcessingException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        assertThat(deserializeValues(collectedRecords))
+                .containsExactlyElementsOf(expectedJsonNodes);
+        checkProducerLeak();
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @EnumSource
+    void testComplexTypeSerialization(JsonSerializationType type) throws 
Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr_col", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn(
+                                "map_col", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .physicalColumn(
+                                "row_col",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("age", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) 
(schema.getColumn("row_col").get().getType()))
+                                .getFieldTypes()
+                                .toArray(new DataType[0]));
+
+        BinaryRecordData recordData =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericArrayData(
+                                    new Object[] {
+                                        BinaryStringData.fromString("Alfa"),
+                                        BinaryStringData.fromString("Bravo"),
+                                        BinaryStringData.fromString("Charlie")
+                                    }),
+                            new GenericMapData(
+                                    Map.of(
+                                            
BinaryStringData.fromString("Delta"), 5,
+                                            
BinaryStringData.fromString("Echo"), 4,
+                                            
BinaryStringData.fromString("Foxtrot"), 7)),
+                            nestedRowGenerator.generate(
+                                    new Object[] 
{BinaryStringData.fromString("Golf"), 97})
+                        });
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData),
+                        DataChangeEvent.updateEvent(table1, recordData, 
recordData),
+                        DataChangeEvent.deleteEvent(table1, recordData));
+
+        List<String> expectedOutput = null;
+        switch (type) {
+            case DEBEZIUM_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"before\":null,\"after\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"after\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+                break;
+            case CANAL_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+                break;
+        }
+        runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @EnumSource
+    void testNestedArraysSerialization(JsonSerializationType type) throws 
Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn(
+                                "nested_arr", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        BinaryRecordData recordData =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericArrayData(
+                                    new Object[] {
+                                        new GenericArrayData(
+                                                new Object[] {new 
BinaryStringData("Alice")}),
+                                        new GenericArrayData(
+                                                new Object[] {new 
BinaryStringData("One")}),
+                                        new GenericArrayData(
+                                                new Object[] {new 
BinaryStringData("Alfa")})
+                                    })
+                        });
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData),
+                        DataChangeEvent.updateEvent(table1, recordData, 
recordData),
+                        DataChangeEvent.deleteEvent(table1, recordData));
+
+        List<String> expectedOutput = null;
+        switch (type) {
+            case DEBEZIUM_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"before\":null,\"after\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"after\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+                break;
+            case CANAL_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+                break;
+        }
+        runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @EnumSource
+    void testMapWithArrayValueSerialization(JsonSerializationType type) throws 
Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn(
+                                "map_arr",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        BinaryRecordData recordData =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericMapData(
+                                    Map.of(
+                                            new BinaryStringData("Alice"),
+                                            new GenericArrayData(new int[] {1, 
2, 3, 4, 5}),
+                                            new BinaryStringData("Bob"),
+                                            new GenericArrayData(new int[] {1, 
2, 3}),
+                                            new BinaryStringData("Carol"),
+                                            new GenericArrayData(new int[] {6, 
7, 8, 9, 10})))
+                        });
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData),
+                        DataChangeEvent.updateEvent(table1, recordData, 
recordData),
+                        DataChangeEvent.deleteEvent(table1, recordData));
+
+        List<String> expectedOutput = null;
+        switch (type) {
+            case DEBEZIUM_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"before\":null,\"after\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"after\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+                break;
+            case CANAL_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+                break;
+        }
+        runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @EnumSource
+    void testNullAndEmptyComplexTypesSerialization(JsonSerializationType type) 
throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        BinaryRecordData recordData1 = generator.generate(new Object[] {1, 
null, null});
+        BinaryRecordData recordData2 =
+                generator.generate(
+                        new Object[] {
+                            1, new GenericArrayData(new Object[] {}), new 
GenericMapData(Map.of())
+                        });
+        Map<StringData, Integer> partialEmptyMap = new HashMap<>();
+        partialEmptyMap.put(BinaryStringData.fromString("Alice"), 1);
+        partialEmptyMap.put(BinaryStringData.fromString("Bob"), null);
+        BinaryRecordData recordData3 =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericArrayData(
+                                    new Object[] 
{BinaryStringData.fromString("Foo"), null}),
+                            new GenericMapData(partialEmptyMap)
+                        });
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData1),
+                        DataChangeEvent.updateEvent(table1, recordData1, 
recordData2),
+                        DataChangeEvent.updateEvent(table1, recordData2, 
recordData3),
+                        DataChangeEvent.deleteEvent(table1, recordData3));
+
+        List<String> expectedOutput = null;
+        switch (type) {
+            case DEBEZIUM_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"before\":null,\"after\":{\"id\":1,\"arr\":null,\"map\":null},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"arr\":null,\"map\":null},\"after\":{\"id\":1,\"arr\":[],\"map\":{}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"arr\":[],\"map\":{}},\"after\":{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+                break;
+            case CANAL_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"arr\":null,\"map\":null}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"arr\":null,\"map\":null}],\"data\":[{\"id\":1,\"arr\":[],\"map\":{}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"arr\":[],\"map\":{}}],\"data\":[{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+                break;
+        }
+        runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @EnumSource
+    void testDeepNestedStructureSerialization(JsonSerializationType type) 
throws Exception {
+        RowType innerRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("x", DataTypes.INT()),
+                        DataTypes.FIELD("y", DataTypes.INT()));
+        RowType outerRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("nested", innerRowType));
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("deep", DataTypes.ARRAY(outerRowType))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+        BinaryRecordDataGenerator outerRowGenerator =
+                new BinaryRecordDataGenerator(
+                        outerRowType.getFieldTypes().toArray(new DataType[0]));
+        BinaryRecordDataGenerator innerRowGenerator =
+                new BinaryRecordDataGenerator(
+                        innerRowType.getFieldTypes().toArray(new DataType[0]));
+
+        BinaryRecordData recordData =
+                generator.generate(
+                        new Object[] {
+                            1,
+                            new GenericArrayData(
+                                    new Object[] {
+                                        outerRowGenerator.generate(
+                                                new Object[] {
+                                                    
BinaryStringData.fromString("323"),
+                                                    innerRowGenerator.generate(
+                                                            new Object[] {17, 
19})
+                                                }),
+                                        outerRowGenerator.generate(
+                                                new Object[] {
+                                                    
BinaryStringData.fromString("143"),
+                                                    innerRowGenerator.generate(
+                                                            new Object[] {11, 
13})
+                                                })
+                                    })
+                        });
+        List<Event> eventsToSerialize =
+                List.of(
+                        new CreateTableEvent(table1, schema),
+                        DataChangeEvent.insertEvent(table1, recordData),
+                        DataChangeEvent.updateEvent(table1, recordData, 
recordData),
+                        DataChangeEvent.deleteEvent(table1, recordData));
+
+        List<String> expectedOutput = null;
+        switch (type) {
+            case DEBEZIUM_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"before\":null,\"after\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"after\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+                                
"{\"before\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+                break;
+            case CANAL_JSON:
+                expectedOutput =
+                        List.of(
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+                                
"{\"old\":null,\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+                break;
+        }
+        runGenericComplexTypeSerializationTest(type, eventsToSerialize, 
expectedOutput);
+    }
 }


Reply via email to