This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 0cfe1d339 [FLINK-38889][pipeline][kafka] Support serializing complex
types(MAP, ARRAY, ROW) to JSON (Debezium / Canal) (#4221)
0cfe1d339 is described below
commit 0cfe1d339988d3cfa2f55ab08d1766310580c57d
Author: SkylerLin <[email protected]>
AuthorDate: Wed Jan 28 10:44:22 2026 +0800
[FLINK-38889][pipeline][kafka] Support serializing complex types(MAP,
ARRAY, ROW) to JSON (Debezium / Canal) (#4221)
Co-authored-by: guoxuanlin <[email protected]>
Co-authored-by: yuxiqian <[email protected]>
---
.../cdc/connectors/kafka/json/TableSchemaInfo.java | 100 +----
.../kafka/json/utils/RecordDataConverter.java | 414 +++++++++++++++++++
.../connectors/kafka/json/TableSchemaInfoTest.java | 61 +++
.../canal/CanalJsonSerializationSchemaTest.java | 94 +++++
.../DebeziumJsonSerializationSchemaTest.java | 93 +++++
.../kafka/json/utils/RecordDataConverterTest.java | 445 +++++++++++++++++++++
.../connectors/kafka/sink/KafkaDataSinkITCase.java | 376 +++++++++++++++++
7 files changed, 1485 insertions(+), 98 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
index 10be2ff07..3ffe5d683 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
@@ -21,22 +21,15 @@ import
org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeChecks;
-import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.cdc.connectors.kafka.json.utils.RecordDataConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.BinaryStringData;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
-import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
-
/** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
public class TableSchemaInfo {
@@ -96,96 +89,7 @@ public class TableSchemaInfo {
}
private static List<RecordData.FieldGetter> createFieldGetters(Schema
schema, ZoneId zoneId) {
- List<RecordData.FieldGetter> fieldGetters = new
ArrayList<>(schema.getColumns().size());
- for (int i = 0; i < schema.getColumns().size(); i++) {
-
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i,
zoneId));
- }
- return fieldGetters;
- }
-
- private static RecordData.FieldGetter createFieldGetter(
- DataType fieldType, int fieldPos, ZoneId zoneId) {
- final RecordData.FieldGetter fieldGetter;
- // ordered by type root definition
- switch (fieldType.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- fieldGetter =
- record ->
-
BinaryStringData.fromString(record.getString(fieldPos).toString());
- break;
- case BOOLEAN:
- fieldGetter = record -> record.getBoolean(fieldPos);
- break;
- case BINARY:
- case VARBINARY:
- fieldGetter = record -> record.getBinary(fieldPos);
- break;
- case DECIMAL:
- final int decimalPrecision = getPrecision(fieldType);
- final int decimalScale = getScale(fieldType);
- fieldGetter =
- record ->
- DecimalData.fromBigDecimal(
- record.getDecimal(fieldPos,
decimalPrecision, decimalScale)
- .toBigDecimal(),
- decimalPrecision,
- decimalScale);
- break;
- case TINYINT:
- fieldGetter = record -> record.getByte(fieldPos);
- break;
- case SMALLINT:
- fieldGetter = record -> record.getShort(fieldPos);
- break;
- case INTEGER:
- fieldGetter = record -> record.getInt(fieldPos);
- break;
- case DATE:
- fieldGetter = record -> (int)
record.getDate(fieldPos).toEpochDay();
- break;
- case TIME_WITHOUT_TIME_ZONE:
- fieldGetter = record -> (int)
record.getTime(fieldPos).toMillisOfDay();
- break;
- case BIGINT:
- fieldGetter = record -> record.getLong(fieldPos);
- break;
- case FLOAT:
- fieldGetter = record -> record.getFloat(fieldPos);
- break;
- case DOUBLE:
- fieldGetter = record -> record.getDouble(fieldPos);
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- fieldGetter =
- record ->
- TimestampData.fromTimestamp(
- record.getTimestamp(fieldPos,
getPrecision(fieldType))
- .toTimestamp());
- break;
- case TIMESTAMP_WITH_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- fieldGetter =
- record ->
- TimestampData.fromInstant(
- record.getLocalZonedTimestampData(
- fieldPos,
-
DataTypeChecks.getPrecision(fieldType))
- .toInstant());
- break;
- default:
- throw new IllegalArgumentException(
- "don't support type of " + fieldType.getTypeRoot());
- }
- if (!fieldType.isNullable()) {
- return fieldGetter;
- }
- return row -> {
- if (row.isNullAt(fieldPos)) {
- return null;
- }
- return fieldGetter.getFieldOrNull(row);
- };
+ return RecordDataConverter.createFieldGetters(schema, zoneId);
}
public Schema getSchema() {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
new file mode 100644
index 000000000..f865a5ffd
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverter.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.utils;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeChecks;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+/** Utility class for converting CDC {@link RecordData} to Flink SQL {@link
RowData}. */
+public class RecordDataConverter {
+
+ /** Create {@link RecordData.FieldGetter}s from given CDC {@link Schema}.
*/
+ public static List<RecordData.FieldGetter> createFieldGetters(Schema
schema, ZoneId zoneId) {
+ List<RecordData.FieldGetter> fieldGetters = new ArrayList<>();
+ for (int i = 0; i < schema.getColumnCount(); i++) {
+
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i,
zoneId));
+ }
+ return fieldGetters;
+ }
+
+ /**
+ * Converts a CDC {@link RecordData} to Flink SQL {@link RowData}.
+ *
+ * @param recordData the CDC record data to convert
+ * @param rowType the row type schema defining the structure
+ * @param zoneId the time zone for timestamp conversions
+ * @return the converted Flink SQL RowData, or null if input is null
+ */
+ private static RowData convertRowData(RecordData recordData, RowType
rowType, ZoneId zoneId) {
+ if (recordData == null) {
+ return null;
+ }
+
+ List<DataField> fields = rowType.getFields();
+ GenericRowData rowData = new GenericRowData(fields.size());
+
+ for (int i = 0; i < fields.size(); i++) {
+ DataField field = fields.get(i);
+ DataType fieldType = field.getType();
+
+ Object value = convertField(recordData, i, fieldType, zoneId);
+ rowData.setField(i, value);
+ }
+
+ return rowData;
+ }
+
+ /**
+ * Converts a single field from CDC {@link RecordData} to Flink SQL format.
+ *
+ * <p>This method handles various data types including primitives,
temporal types, and complex
+ * types (arrays, maps, rows).
+ *
+ * @param recordData the CDC record data containing the field
+ * @param pos the position of the field in the record
+ * @param fieldType the data type of the field
+ * @param zoneId the time zone for timestamp conversions
+ * @return the converted field value in Flink SQL format, or null if the
field is null
+ * @throws IllegalArgumentException if the field type is not supported
+ */
+ private static Object convertField(
+ RecordData recordData, int pos, DataType fieldType, ZoneId zoneId)
{
+ if (recordData.isNullAt(pos)) {
+ return null;
+ }
+
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return
BinaryStringData.fromString(recordData.getString(pos).toString());
+ case BOOLEAN:
+ return recordData.getBoolean(pos);
+ case BINARY:
+ case VARBINARY:
+ return recordData.getBinary(pos);
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ return DecimalData.fromBigDecimal(
+ recordData.getDecimal(pos, decimalPrecision,
decimalScale).toBigDecimal(),
+ decimalPrecision,
+ decimalScale);
+ case TINYINT:
+ return recordData.getByte(pos);
+ case SMALLINT:
+ return recordData.getShort(pos);
+ case INTEGER:
+ return recordData.getInt(pos);
+ case DATE:
+ return recordData.getDate(pos).toEpochDay();
+ case TIME_WITHOUT_TIME_ZONE:
+ return recordData.getTime(pos).toMillisOfDay();
+ case BIGINT:
+ return recordData.getLong(pos);
+ case FLOAT:
+ return recordData.getFloat(pos);
+ case DOUBLE:
+ return recordData.getDouble(pos);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TimestampData.fromTimestamp(
+ recordData.getTimestamp(pos,
getPrecision(fieldType)).toTimestamp());
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TimestampData.fromInstant(
+ recordData
+ .getLocalZonedTimestampData(
+ pos,
DataTypeChecks.getPrecision(fieldType))
+ .toInstant());
+ case ARRAY:
+ ArrayData arrayData = recordData.getArray(pos);
+ return convertArrayData(arrayData, (ArrayType) fieldType,
zoneId);
+ case MAP:
+ MapData mapData = recordData.getMap(pos);
+ return convertMapData(mapData, (MapType) fieldType, zoneId);
+ case ROW:
+ RecordData nestedRecordData =
+ recordData.getRow(pos,
DataTypeChecks.getFieldCount(fieldType));
+ return convertRowData(nestedRecordData, (RowType) fieldType,
zoneId);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported field type: " + fieldType.getTypeRoot());
+ }
+ }
+
+ /**
+ * Converts a CDC {@link ArrayData} to Flink SQL {@link
org.apache.flink.table.data.ArrayData}.
+ *
+ * <p>This method recursively converts each element in the array according
to the element type.
+ *
+ * @param arrayData the CDC array data to convert
+ * @param arrayType the array type schema defining the element type
+ * @param zoneId the time zone for timestamp conversions
+ * @return the converted Flink SQL ArrayData, or null if input is null
+ */
+ private static org.apache.flink.table.data.ArrayData convertArrayData(
+ ArrayData arrayData, ArrayType arrayType, ZoneId zoneId) {
+ if (arrayData == null) {
+ return null;
+ }
+
+ DataType elementType = arrayType.getElementType();
+ int size = arrayData.size();
+ Object[] result = new Object[size];
+
+ for (int i = 0; i < size; i++) {
+ result[i] = convertElement(arrayData, i, elementType, zoneId);
+ }
+
+ return new org.apache.flink.table.data.GenericArrayData(result);
+ }
+
+ /**
+ * Converts a CDC {@link MapData} to Flink SQL {@link
org.apache.flink.table.data.MapData}.
+ *
+ * <p>This method converts both keys and values in the map according to
their respective types.
+ *
+ * @param mapData the CDC map data to convert
+ * @param mapType the map type schema defining key and value types
+ * @param zoneId the time zone for timestamp conversions
+ * @return the converted Flink SQL MapData, or null if input is null
+ */
+ private static org.apache.flink.table.data.MapData convertMapData(
+ MapData mapData, MapType mapType, ZoneId zoneId) {
+ if (mapData == null) {
+ return null;
+ }
+
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+
+ int size = keyArray.size();
+ Map<Object, Object> result = new HashMap<>();
+
+ DataType keyType = mapType.getKeyType();
+ DataType valueType = mapType.getValueType();
+
+ for (int i = 0; i < size; i++) {
+ Object key = convertElement(keyArray, i, keyType, zoneId);
+ Object value = convertElement(valueArray, i, valueType, zoneId);
+ result.put(key, value);
+ }
+
+ return new org.apache.flink.table.data.GenericMapData(result);
+ }
+
+ /**
+ * Converts a single element from CDC {@link ArrayData} to Flink SQL
format.
+ *
+ * <p>This method is similar to {@link #convertField} but operates on
array elements.
+ *
+ * @param arrayData the CDC array data containing the element
+ * @param pos the position of the element in the array
+ * @param elementType the data type of the element
+ * @param zoneId the time zone for timestamp conversions
+ * @return the converted element value in Flink SQL format, or null if the
element is null
+ * @throws IllegalArgumentException if the element type is not supported
+ */
+ private static Object convertElement(
+ ArrayData arrayData, int pos, DataType elementType, ZoneId zoneId)
{
+ if (arrayData.isNullAt(pos)) {
+ return null;
+ }
+
+ switch (elementType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return
BinaryStringData.fromString(arrayData.getString(pos).toString());
+ case BOOLEAN:
+ return arrayData.getBoolean(pos);
+ case BINARY:
+ case VARBINARY:
+ return arrayData.getBinary(pos);
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(elementType);
+ final int decimalScale = getScale(elementType);
+ return DecimalData.fromBigDecimal(
+ arrayData
+ .getDecimal(pos, getPrecision(elementType),
getScale(elementType))
+ .toBigDecimal(),
+ decimalPrecision,
+ decimalScale);
+ case TINYINT:
+ return arrayData.getByte(pos);
+ case SMALLINT:
+ return arrayData.getShort(pos);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return arrayData.getInt(pos);
+ case BIGINT:
+ return arrayData.getLong(pos);
+ case FLOAT:
+ return arrayData.getFloat(pos);
+ case DOUBLE:
+ return arrayData.getDouble(pos);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TimestampData.fromTimestamp(
+ arrayData.getTimestamp(pos,
getPrecision(elementType)).toTimestamp());
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TimestampData.fromInstant(
+ arrayData
+ .getLocalZonedTimestamp(pos,
getPrecision(elementType))
+ .toInstant());
+ case ARRAY:
+ ArrayData nestedArrayData = arrayData.getArray(pos);
+ return convertArrayData(nestedArrayData, (ArrayType)
elementType, zoneId);
+ case MAP:
+ MapData mapData = arrayData.getMap(pos);
+ return convertMapData(mapData, (MapType) elementType, zoneId);
+ case ROW:
+ RecordData recordData =
+ arrayData.getRecord(pos,
DataTypeChecks.getFieldCount(elementType));
+ return convertRowData(recordData, (RowType) elementType,
zoneId);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported element type: " +
elementType.getTypeRoot());
+ }
+ }
+
+ /**
+ * Creates a {@link RecordData.FieldGetter} for extracting and converting
a specific field.
+ *
+ * <p>This method generates a lambda function that extracts a field at the
given position and
+ * converts it to Flink SQL format.
+ *
+ * @param fieldType the data type of the field
+ * @param fieldPos the position of the field in the record
+ * @param zoneId the time zone for timestamp conversions
+ * @return a FieldGetter that extracts and converts the field value
+ * @throws IllegalArgumentException if the field type is not supported
+ */
+ private static RecordData.FieldGetter createFieldGetter(
+ DataType fieldType, int fieldPos, ZoneId zoneId) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldGetter =
+ record ->
+
BinaryStringData.fromString(record.getString(fieldPos).toString());
+ break;
+ case BOOLEAN:
+ fieldGetter = record -> record.getBoolean(fieldPos);
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldGetter = record -> record.getBinary(fieldPos);
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter =
+ record ->
+ DecimalData.fromBigDecimal(
+ record.getDecimal(fieldPos,
decimalPrecision, decimalScale)
+ .toBigDecimal(),
+ decimalPrecision,
+ decimalScale);
+ break;
+ case TINYINT:
+ fieldGetter = record -> record.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = record -> record.getShort(fieldPos);
+ break;
+ case INTEGER:
+ fieldGetter = record -> record.getInt(fieldPos);
+ break;
+ case DATE:
+ fieldGetter = record -> (int)
record.getDate(fieldPos).toEpochDay();
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldGetter = record -> (int)
record.getTime(fieldPos).toMillisOfDay();
+ break;
+ case BIGINT:
+ fieldGetter = record -> record.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getDouble(fieldPos);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record ->
+ TimestampData.fromTimestamp(
+ record.getTimestamp(fieldPos,
getPrecision(fieldType))
+ .toTimestamp());
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ fieldGetter =
+ record ->
+ TimestampData.fromInstant(
+ record.getLocalZonedTimestampData(
+ fieldPos,
+
DataTypeChecks.getPrecision(fieldType))
+ .toInstant());
+ break;
+ case ARRAY:
+ fieldGetter =
+ record ->
+ convertArrayData(
+ record.getArray(fieldPos), (ArrayType)
fieldType, zoneId);
+ break;
+ case MAP:
+ fieldGetter =
+ record ->
+ convertMapData(
+ record.getMap(fieldPos), (MapType)
fieldType, zoneId);
+ break;
+ case ROW:
+ fieldGetter =
+ record ->
+ convertRowData(
+ record.getRow(
+ fieldPos, ((RowType)
fieldType).getFieldCount()),
+ (RowType) fieldType,
+ zoneId);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported field type: " + fieldType.getTypeRoot());
+ }
+ if (!fieldType.isNullable()) {
+ return fieldGetter;
+ }
+ return row -> {
+ if (row.isNullAt(fieldPos)) {
+ return null;
+ }
+ return fieldGetter.getFieldOrNull(row);
+ };
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
index ffae90442..3bafe8eec 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
@@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.kafka.json;
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -27,6 +29,7 @@ import
org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.types.RowKind;
@@ -169,4 +172,62 @@ class TableSchemaInfoTest {
Instant.parse("2023-01-01T00:00:00.000Z")),
null));
}
+
+ @Test
+ void testArrayWithNestedRowType() {
+ // Create a schema with ARRAY<ROW<name STRING, age INT>>
+ DataType personRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT()));
+
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("persons",
DataTypes.ARRAY(personRowType))
+ .primaryKey("id")
+ .build();
+
+ TableSchemaInfo tableSchemaInfo =
+ new TableSchemaInfo(
+ TableId.parse("testDatabase.testTable"), schema, null,
ZoneId.of("UTC"));
+
+ // Create test data with nested ROW in ARRAY
+ BinaryRecordDataGenerator personGenerator =
+ new BinaryRecordDataGenerator(new DataType[]
{DataTypes.STRING(), DataTypes.INT()});
+
+ RecordData person1 =
+ personGenerator.generate(new Object[]
{BinaryStringData.fromString("Alice"), 30});
+ RecordData person2 =
+ personGenerator.generate(new Object[]
{BinaryStringData.fromString("Bob"), 25});
+
+ GenericArrayData personsArray = new GenericArrayData(new RecordData[]
{person1, person2});
+
+ Object[] testData = new Object[] {1, personsArray};
+
+ BinaryRecordData recordData =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+
+ // Convert and verify
+ org.apache.flink.table.data.RowData result =
+ tableSchemaInfo.getRowDataFromRecordData(recordData, false);
+
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getInt(0)).isEqualTo(1);
+
+ org.apache.flink.table.data.ArrayData resultArray = result.getArray(1);
+ Assertions.assertThat(resultArray).isNotNull();
+ Assertions.assertThat(resultArray.size()).isEqualTo(2);
+
+ // Verify first person
+ org.apache.flink.table.data.RowData resultPerson1 =
resultArray.getRow(0, 2);
+
Assertions.assertThat(resultPerson1.getString(0).toString()).isEqualTo("Alice");
+ Assertions.assertThat(resultPerson1.getInt(1)).isEqualTo(30);
+
+ // Verify second person
+ org.apache.flink.table.data.RowData resultPerson2 =
resultArray.getRow(1, 2);
+
Assertions.assertThat(resultPerson2.getString(0).toString()).isEqualTo("Bob");
+ Assertions.assertThat(resultPerson2.getInt(1)).isEqualTo(25);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
index 9543c90d1..dc2b71d43 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
@@ -134,4 +134,98 @@ class CanalJsonSerializationSchemaTest {
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertThat(actual).isEqualTo(expected);
}
+
+ @Test
+ void testSerializeComplexTypes() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ SerializationSchema<Event> serializationSchema =
+ ChangeLogJsonFormatFactory.createSerializationSchema(
+ new Configuration(),
+ JsonSerializationType.CANAL_JSON,
+ ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table with complex types
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .physicalColumn(
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("f2", DataTypes.INT())));
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+
Assertions.assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+
+ // Create test data with complex types
+ org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+ new org.apache.flink.cdc.common.data.GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("item1"),
+ BinaryStringData.fromString("item2")
+ });
+
+ java.util.Map<Object, Object> mapValues = new java.util.HashMap<>();
+ mapValues.put(BinaryStringData.fromString("key1"), 100);
+ mapValues.put(BinaryStringData.fromString("key2"), 200);
+ org.apache.flink.cdc.common.data.GenericMapData mapData =
+ new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+ BinaryRecordDataGenerator nestedRowGenerator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(),
DataTypes.INT()));
+ org.apache.flink.cdc.common.data.RecordData nestedRow =
+ nestedRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("nested"),
42});
+
+ // insert event with complex types
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(new Object[] {1, arrayData,
mapData, nestedRow}));
+
+ byte[] serialized = serializationSchema.serialize(insertEvent);
+ JsonNode actual = mapper.readTree(serialized);
+
+ // Verify the structure contains complex types
+ Assertions.assertThat(actual.has("old")).isTrue();
+ Assertions.assertThat(actual.has("data")).isTrue();
+ Assertions.assertThat(actual.get("data").isArray()).isTrue();
+ Assertions.assertThat(actual.get("data").get(0).has("id")).isTrue();
+ Assertions.assertThat(actual.get("data").get(0).has("arr")).isTrue();
+ Assertions.assertThat(actual.get("data").get(0).has("map")).isTrue();
+ Assertions.assertThat(actual.get("data").get(0).has("row")).isTrue();
+
+ // Verify array content
+ JsonNode arrNode = actual.get("data").get(0).get("arr");
+ Assertions.assertThat(arrNode.isArray()).isTrue();
+ Assertions.assertThat(arrNode.size()).isEqualTo(2);
+
+ // Verify map content
+ JsonNode mapNode = actual.get("data").get(0).get("map");
+ Assertions.assertThat(mapNode.isObject()).isTrue();
+
+ // Verify row content
+ JsonNode rowNode = actual.get("data").get(0).get("row");
+ Assertions.assertThat(rowNode.isObject()).isTrue();
+ Assertions.assertThat(rowNode.has("f1")).isTrue();
+ Assertions.assertThat(rowNode.has("f2")).isTrue();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index f63677c6c..35aada0c2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -247,4 +247,97 @@ class DebeziumJsonSerializationSchemaTest {
JsonNode actual =
mapper.readTree(serializationSchema.serialize(insertEvent1));
assertThat(actual).isEqualTo(expected);
}
+
+ @Test
+ void testSerializeComplexTypes() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ SerializationSchema<Event> serializationSchema =
+ ChangeLogJsonFormatFactory.createSerializationSchema(
+ new Configuration(),
+ JsonSerializationType.DEBEZIUM_JSON,
+ ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table with complex types
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .physicalColumn(
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("f2", DataTypes.INT())));
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+
+ // Create test data with complex types
+ org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+ new org.apache.flink.cdc.common.data.GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("item1"),
+ BinaryStringData.fromString("item2")
+ });
+
+ Map<Object, Object> mapValues = new HashMap<>();
+ mapValues.put(BinaryStringData.fromString("key1"), 100);
+ mapValues.put(BinaryStringData.fromString("key2"), 200);
+ org.apache.flink.cdc.common.data.GenericMapData mapData =
+ new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+ BinaryRecordDataGenerator nestedRowGenerator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(),
DataTypes.INT()));
+ org.apache.flink.cdc.common.data.RecordData nestedRow =
+ nestedRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("nested"),
42});
+
+ // insert event with complex types
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(new Object[] {1, arrayData,
mapData, nestedRow}));
+
+ byte[] serialized = serializationSchema.serialize(insertEvent);
+ JsonNode actual = mapper.readTree(serialized);
+
+ // Verify the structure contains complex types
+ assertThat(actual.has("before")).isTrue();
+ assertThat(actual.has("after")).isTrue();
+ assertThat(actual.get("after").has("id")).isTrue();
+ assertThat(actual.get("after").has("arr")).isTrue();
+ assertThat(actual.get("after").has("map")).isTrue();
+ assertThat(actual.get("after").has("row")).isTrue();
+
+ // Verify array content
+ JsonNode arrNode = actual.get("after").get("arr");
+ assertThat(arrNode.isArray()).isTrue();
+ assertThat(arrNode.size()).isEqualTo(2);
+
+ // Verify map content
+ JsonNode mapNode = actual.get("after").get("map");
+ assertThat(mapNode.isObject()).isTrue();
+
+ // Verify row content
+ JsonNode rowNode = actual.get("after").get("row");
+ assertThat(rowNode.isObject()).isTrue();
+ assertThat(rowNode.has("f1")).isTrue();
+ assertThat(rowNode.has("f2")).isTrue();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
new file mode 100644
index 000000000..1013d210e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/utils/RecordDataConverterTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.utils;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Unit tests for {@link RecordDataConverter}. */
+class RecordDataConverterTest {
+
+ private static final ZoneId TEST_ZONE_ID = ZoneId.of("UTC");
+
+ /** Tests RecordDataConverter.createFieldGetters method for all supported
data types. */
+ @Test
+ void testCreateFieldGettersForAllTypes() {
+ // Define nested ROW types for complex type testing
+ DataType innerRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("inner_name", DataTypes.STRING()),
+ DataTypes.FIELD("inner_value", DataTypes.INT()));
+
+ DataType outerRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("row_id", DataTypes.BIGINT()),
+ DataTypes.FIELD("row_inner", innerRowType));
+
+ Schema schema =
+ Schema.newBuilder()
+ // Basic types
+ .physicalColumn("col_boolean", DataTypes.BOOLEAN())
+ .physicalColumn("col_tinyint", DataTypes.TINYINT())
+ .physicalColumn("col_smallint", DataTypes.SMALLINT())
+ .physicalColumn("col_int", DataTypes.INT())
+ .physicalColumn("col_bigint", DataTypes.BIGINT())
+ .physicalColumn("col_float", DataTypes.FLOAT())
+ .physicalColumn("col_double", DataTypes.DOUBLE())
+ // String types
+ .physicalColumn("col_char", DataTypes.CHAR(10))
+ .physicalColumn("col_varchar", DataTypes.VARCHAR(50))
+ .physicalColumn("col_string", DataTypes.STRING())
+ // Binary types
+ .physicalColumn("col_binary", DataTypes.BINARY(5))
+ .physicalColumn("col_varbinary",
DataTypes.VARBINARY(20))
+ .physicalColumn("col_bytes", DataTypes.BYTES())
+ // Decimal types
+ .physicalColumn("col_decimal_small",
DataTypes.DECIMAL(6, 3))
+ .physicalColumn("col_decimal_large",
DataTypes.DECIMAL(38, 18))
+ // Temporal types
+ .physicalColumn("col_date", DataTypes.DATE())
+ .physicalColumn("col_time", DataTypes.TIME())
+ .physicalColumn("col_time_precision",
DataTypes.TIME(6))
+ .physicalColumn("col_timestamp", DataTypes.TIMESTAMP())
+ .physicalColumn("col_timestamp_ltz",
DataTypes.TIMESTAMP_LTZ())
+ // Array types
+ .physicalColumn("col_array_int",
DataTypes.ARRAY(DataTypes.INT()))
+ .physicalColumn("col_array_string",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("col_array_row",
DataTypes.ARRAY(innerRowType))
+ // Map types
+ .physicalColumn(
+ "col_map_string_int",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .physicalColumn(
+ "col_map_int_row",
DataTypes.MAP(DataTypes.INT(), innerRowType))
+ // Row types
+ .physicalColumn("col_row", innerRowType)
+ .physicalColumn("col_nested_row", outerRowType)
+ // NOT NULL type
+ .physicalColumn("col_int_not_null",
DataTypes.INT().notNull())
+ .primaryKey("col_int")
+ .build();
+
+ List<RecordData.FieldGetter> fieldGetters =
+ RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+ Assertions.assertThat(fieldGetters).hasSize(28);
+
+ // Prepare test data - create Row data generators
+ BinaryRecordDataGenerator innerRowGenerator =
+ new BinaryRecordDataGenerator(new DataType[]
{DataTypes.STRING(), DataTypes.INT()});
+ BinaryRecordDataGenerator outerRowGenerator =
+ new BinaryRecordDataGenerator(new DataType[]
{DataTypes.BIGINT(), innerRowType});
+ RecordData innerRow1 =
+ innerRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("inner1"),
100});
+ RecordData innerRow2 =
+ innerRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("inner2"),
200});
+ RecordData innerRow3 =
+ innerRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("inner3"),
300});
+
+ RecordData outerRow = outerRowGenerator.generate(new Object[] {999L,
innerRow1});
+ GenericArrayData intArray = new GenericArrayData(new Integer[] {1, 2,
3, 4, 5});
+ GenericArrayData stringArray =
+ new GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("a"),
+ BinaryStringData.fromString("b"),
+ BinaryStringData.fromString("c")
+ });
+ GenericArrayData rowArray = new GenericArrayData(new RecordData[]
{innerRow2, innerRow3});
+
+ // Create Map data
+ Map<Object, Object> stringIntMap = new HashMap<>();
+ stringIntMap.put(BinaryStringData.fromString("key1"), 10);
+ stringIntMap.put(BinaryStringData.fromString("key2"), 20);
+ GenericMapData mapStringInt = new GenericMapData(stringIntMap);
+
+ Map<Object, Object> intRowMap = new HashMap<>();
+ intRowMap.put(1, innerRow1);
+ intRowMap.put(2, innerRow2);
+ GenericMapData mapIntRow = new GenericMapData(intRowMap);
+
+ Instant testInstant = Instant.parse("2023-06-15T10:30:00.123456Z");
+ Object[] testData =
+ new Object[] {
+ true,
+ (byte) 127,
+ (short) 32767,
+ 2147483647,
+ 9223372036854775807L,
+ 3.14f,
+ 2.718281828,
+ BinaryStringData.fromString("char_val"),
+ BinaryStringData.fromString("varchar_value"),
+ BinaryStringData.fromString("string_value"),
+ new byte[] {1, 2, 3, 4, 5},
+ new byte[] {6, 7, 8, 9, 10, 11, 12},
+ new byte[] {13, 14, 15},
+ DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6,
3),
+ DecimalData.fromBigDecimal(
+ new
BigDecimal("12345678901234567890.123456789012345678"), 38, 18),
+ DateData.fromEpochDay(19523),
+ TimeData.fromMillisOfDay(37800000),
+ TimeData.fromNanoOfDay(37800123456000L),
+ TimestampData.fromTimestamp(
+ java.sql.Timestamp.valueOf("2023-06-15
10:30:00.123456")),
+ LocalZonedTimestampData.fromInstant(testInstant),
+ intArray,
+ stringArray,
+ rowArray,
+ mapStringInt,
+ mapIntRow,
+ innerRow1,
+ outerRow,
+ 42
+ };
+
+ BinaryRecordData recordData =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+
+ int idx = 0;
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(true);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo((byte) 127);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo((short) 32767);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(2147483647);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(9223372036854775807L);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(3.14f);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(2.718281828);
+
+ // Verify string types
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+ .isEqualTo("char_val");
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+ .isEqualTo("varchar_value");
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData).toString())
+ .isEqualTo("string_value");
+
+ // Verify binary types
+ Assertions.assertThat((byte[])
fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(new byte[] {1, 2, 3, 4, 5});
+ Assertions.assertThat((byte[])
fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(new byte[] {6, 7, 8, 9, 10, 11, 12});
+ Assertions.assertThat((byte[])
fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(new byte[] {13, 14, 15});
+
+ // Verify decimal types
+ org.apache.flink.table.data.DecimalData decimalSmall =
+ (org.apache.flink.table.data.DecimalData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(decimalSmall.toBigDecimal())
+ .isEqualByComparingTo(new BigDecimal("123.456"));
+
+ org.apache.flink.table.data.DecimalData decimalLarge =
+ (org.apache.flink.table.data.DecimalData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(decimalLarge.toBigDecimal())
+ .isEqualByComparingTo(new
BigDecimal("12345678901234567890.123456789012345678"));
+
+ // Verify temporal types
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData)).isEqualTo(19523);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(37800000);
+
Assertions.assertThat(fieldGetters.get(idx++).getFieldOrNull(recordData))
+ .isEqualTo(37800123);
+
+ org.apache.flink.table.data.TimestampData timestamp =
+ (org.apache.flink.table.data.TimestampData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(timestamp.toTimestamp())
+ .isEqualTo(java.sql.Timestamp.valueOf("2023-06-15
10:30:00.123456"));
+
+ org.apache.flink.table.data.TimestampData timestampLtz =
+ (org.apache.flink.table.data.TimestampData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(timestampLtz.toInstant()).isEqualTo(testInstant);
+
+ // Verify array types
+ org.apache.flink.table.data.ArrayData resultIntArray =
+ (org.apache.flink.table.data.ArrayData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultIntArray.size()).isEqualTo(5);
+ Assertions.assertThat(resultIntArray.getInt(0)).isEqualTo(1);
+ Assertions.assertThat(resultIntArray.getInt(4)).isEqualTo(5);
+
+ org.apache.flink.table.data.ArrayData resultStringArray =
+ (org.apache.flink.table.data.ArrayData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultStringArray.size()).isEqualTo(3);
+
Assertions.assertThat(resultStringArray.getString(0).toString()).isEqualTo("a");
+
Assertions.assertThat(resultStringArray.getString(2).toString()).isEqualTo("c");
+
+ org.apache.flink.table.data.ArrayData resultRowArray =
+ (org.apache.flink.table.data.ArrayData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultRowArray.size()).isEqualTo(2);
+ org.apache.flink.table.data.RowData arrayRow0 =
resultRowArray.getRow(0, 2);
+
Assertions.assertThat(arrayRow0.getString(0).toString()).isEqualTo("inner2");
+ Assertions.assertThat(arrayRow0.getInt(1)).isEqualTo(200);
+
+ // Verify map types
+ org.apache.flink.table.data.MapData resultMapStringInt =
+ (org.apache.flink.table.data.MapData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultMapStringInt).isNotNull();
+
Assertions.assertThat(resultMapStringInt.keyArray().size()).isEqualTo(2);
+
+ org.apache.flink.table.data.MapData resultMapIntRow =
+ (org.apache.flink.table.data.MapData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultMapIntRow).isNotNull();
+ Assertions.assertThat(resultMapIntRow.keyArray().size()).isEqualTo(2);
+
+ // Verify row types
+ org.apache.flink.table.data.RowData resultRow =
+ (org.apache.flink.table.data.RowData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultRow).isNotNull();
+
Assertions.assertThat(resultRow.getString(0).toString()).isEqualTo("inner1");
+ Assertions.assertThat(resultRow.getInt(1)).isEqualTo(100);
+
+ org.apache.flink.table.data.RowData resultNestedRow =
+ (org.apache.flink.table.data.RowData)
+ fieldGetters.get(idx++).getFieldOrNull(recordData);
+ Assertions.assertThat(resultNestedRow).isNotNull();
+ Assertions.assertThat(resultNestedRow.getLong(0)).isEqualTo(999L);
+ org.apache.flink.table.data.RowData nestedInnerRow =
resultNestedRow.getRow(1, 2);
+
Assertions.assertThat(nestedInnerRow.getString(0).toString()).isEqualTo("inner1");
+ Assertions.assertThat(nestedInnerRow.getInt(1)).isEqualTo(100);
+
+ // Verify NOT NULL type
+
Assertions.assertThat(fieldGetters.get(idx).getFieldOrNull(recordData)).isEqualTo(42);
+ }
+
+ /** Tests the case when all field values are null. */
+ @Test
+ void testCreateFieldGettersWithAllNullValues() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("string_col", DataTypes.STRING())
+ .physicalColumn("int_col", DataTypes.INT())
+ .physicalColumn("decimal_col", DataTypes.DECIMAL(10,
2))
+ .physicalColumn("timestamp_col", DataTypes.TIMESTAMP())
+ .physicalColumn("array_col",
DataTypes.ARRAY(DataTypes.INT()))
+ .physicalColumn(
+ "map_col", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .physicalColumn(
+ "row_col", DataTypes.ROW(DataTypes.FIELD("f1",
DataTypes.STRING())))
+ .build();
+
+ List<RecordData.FieldGetter> fieldGetters =
+ RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+ Object[] testData = new Object[] {null, null, null, null, null, null,
null};
+
+ BinaryRecordData recordData =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+
+ Assertions.assertThat(fieldGetters).hasSize(7);
+ for (int i = 0; i < fieldGetters.size(); i++) {
+
Assertions.assertThat(fieldGetters.get(i).getFieldOrNull(recordData)).isNull();
+ }
+ }
+
+ /** Tests the case with mixed null and non-null values. */
+ @Test
+ void testCreateFieldGettersWithMixedNullValues() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("string_col", DataTypes.STRING())
+ .physicalColumn("int_col", DataTypes.INT())
+ .physicalColumn("nullable_string", DataTypes.STRING())
+ .physicalColumn("bigint_col", DataTypes.BIGINT())
+ .physicalColumn("nullable_array",
DataTypes.ARRAY(DataTypes.INT()))
+ .build();
+
+ List<RecordData.FieldGetter> fieldGetters =
+ RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+ Object[] testData =
+ new Object[] {BinaryStringData.fromString("test"), 42, null,
100L, null};
+
+ BinaryRecordData recordData =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+
+
Assertions.assertThat(fieldGetters.get(0).getFieldOrNull(recordData).toString())
+ .isEqualTo("test");
+
Assertions.assertThat(fieldGetters.get(1).getFieldOrNull(recordData)).isEqualTo(42);
+
Assertions.assertThat(fieldGetters.get(2).getFieldOrNull(recordData)).isNull();
+
Assertions.assertThat(fieldGetters.get(3).getFieldOrNull(recordData)).isEqualTo(100L);
+
Assertions.assertThat(fieldGetters.get(4).getFieldOrNull(recordData)).isNull();
+ }
+
+ /** Tests the case with an empty Schema. */
+ @Test
+ void testCreateFieldGettersForEmptySchema() {
+ Schema schema = Schema.newBuilder().build();
+
+ List<RecordData.FieldGetter> fieldGetters =
+ RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+ Assertions.assertThat(fieldGetters).isEmpty();
+ }
+
+ /** Tests the impact of different time zones on TIMESTAMP_LTZ type. */
+ @Test
+ void testCreateFieldGettersWithDifferentTimeZones() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("timestamp_ltz",
DataTypes.TIMESTAMP_LTZ())
+ .build();
+
+ Instant testInstant = Instant.parse("2023-01-01T12:00:00.000Z");
+ Object[] testData = new Object[]
{LocalZonedTimestampData.fromInstant(testInstant)};
+
+ List<RecordData.FieldGetter> utcFieldGetters =
+ RecordDataConverter.createFieldGetters(schema,
ZoneId.of("UTC"));
+ BinaryRecordData recordData1 =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+ org.apache.flink.table.data.TimestampData utcResult =
+ (org.apache.flink.table.data.TimestampData)
+ utcFieldGetters.get(0).getFieldOrNull(recordData1);
+ Assertions.assertThat(utcResult.toInstant()).isEqualTo(testInstant);
+
+ // Test with Asia/Shanghai timezone
+ List<RecordData.FieldGetter> shanghaiFieldGetters =
+ RecordDataConverter.createFieldGetters(schema,
ZoneId.of("Asia/Shanghai"));
+ BinaryRecordData recordData2 =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+ org.apache.flink.table.data.TimestampData shanghaiResult =
+ (org.apache.flink.table.data.TimestampData)
+
shanghaiFieldGetters.get(0).getFieldOrNull(recordData2);
+
Assertions.assertThat(shanghaiResult.toInstant()).isEqualTo(testInstant);
+ }
+
+ /** Tests edge cases with empty Array and empty Map. */
+ @Test
+ void testCreateFieldGettersForEmptyCollections() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("empty_array",
DataTypes.ARRAY(DataTypes.INT()))
+ .physicalColumn(
+ "empty_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .build();
+
+ List<RecordData.FieldGetter> fieldGetters =
+ RecordDataConverter.createFieldGetters(schema, TEST_ZONE_ID);
+
+ GenericArrayData emptyArray = new GenericArrayData(new Integer[] {});
+ GenericMapData emptyMap = new GenericMapData(new HashMap<>());
+
+ Object[] testData = new Object[] {emptyArray, emptyMap};
+
+ BinaryRecordData recordData =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .generate(testData);
+
+ org.apache.flink.table.data.ArrayData resultArray =
+ (org.apache.flink.table.data.ArrayData)
+ fieldGetters.get(0).getFieldOrNull(recordData);
+ Assertions.assertThat(resultArray.size()).isEqualTo(0);
+
+ org.apache.flink.table.data.MapData resultMap =
+ (org.apache.flink.table.data.MapData)
+ fieldGetters.get(1).getFieldOrNull(recordData);
+ Assertions.assertThat(resultMap.keyArray().size()).isEqualTo(0);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 8ccf85328..c3bb94fde 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -20,6 +20,10 @@ package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -32,6 +36,7 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
@@ -44,6 +49,7 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -58,6 +64,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
@@ -696,4 +704,372 @@ class KafkaDataSinkITCase extends TestLogger {
return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
&&
threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
}
+
+ void runGenericComplexTypeSerializationTest(
+ JsonSerializationType serializationType,
+ List<Event> eventsToSerialize,
+ List<String> expectedJson)
+ throws Exception {
+ try (StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
+ env.enableCheckpointing(1000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ final DataStream<Event> source = env.fromData(eventsToSerialize,
new EventTypeInfo());
+ Map<String, String> config = new HashMap<>();
+ Properties properties = getKafkaClientConfiguration();
+ properties.forEach(
+ (key, value) ->
+ config.put(
+ KafkaDataSinkOptions.PROPERTIES_PREFIX +
key.toString(),
+ value.toString()));
+ if (serializationType == JsonSerializationType.CANAL_JSON) {
+ config.put(
+ KafkaDataSinkOptions.VALUE_FORMAT.key(),
+ JsonSerializationType.CANAL_JSON.toString());
+ }
+ source.sinkTo(
+ ((FlinkSinkProvider)
+ (new KafkaDataSinkFactory()
+ .createDataSink(
+ new
FactoryHelper.DefaultContext(
+
Configuration.fromMap(config),
+
Configuration.fromMap(new HashMap<>()),
+
this.getClass().getClassLoader()))
+ .getEventSinkProvider()))
+ .getSink());
+ env.execute();
+ }
+
+ final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
+ drainAllRecordsFromTopic(topic, false, 0);
+ assertThat(collectedRecords).hasSameSizeAs(expectedJson);
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ List<JsonNode> expectedJsonNodes =
+ expectedJson.stream()
+ .map(
+ s -> {
+ try {
+ return mapper.readTree(
+ String.format(s,
table1.getTableName()));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertThat(deserializeValues(collectedRecords))
+ .containsExactlyElementsOf(expectedJsonNodes);
+ checkProducerLeak();
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @EnumSource
+ void testComplexTypeSerialization(JsonSerializationType type) throws
Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("arr_col",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn(
+ "map_col", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .physicalColumn(
+ "row_col",
+ DataTypes.ROW(
+ DataTypes.FIELD("name",
DataTypes.STRING()),
+ DataTypes.FIELD("age",
DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ BinaryRecordDataGenerator nestedRowGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType)
(schema.getColumn("row_col").get().getType()))
+ .getFieldTypes()
+ .toArray(new DataType[0]));
+
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("Alfa"),
+ BinaryStringData.fromString("Bravo"),
+ BinaryStringData.fromString("Charlie")
+ }),
+ new GenericMapData(
+ Map.of(
+
BinaryStringData.fromString("Delta"), 5,
+
BinaryStringData.fromString("Echo"), 4,
+
BinaryStringData.fromString("Foxtrot"), 7)),
+ nestedRowGenerator.generate(
+ new Object[]
{BinaryStringData.fromString("Golf"), 97})
+ });
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData),
+ DataChangeEvent.updateEvent(table1, recordData,
recordData),
+ DataChangeEvent.deleteEvent(table1, recordData));
+
+ List<String> expectedOutput = null;
+ switch (type) {
+ case DEBEZIUM_JSON:
+ expectedOutput =
+ List.of(
+
"{\"before\":null,\"after\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"after\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Foxtrot\":7,\"Delta\":5,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+ break;
+ case CANAL_JSON:
+ expectedOutput =
+ List.of(
+
"{\"old\":null,\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":null,\"data\":[{\"id\":1,\"arr_col\":[\"Alfa\",\"Bravo\",\"Charlie\"],\"map_col\":{\"Delta\":5,\"Foxtrot\":7,\"Echo\":4},\"row_col\":{\"name\":\"Golf\",\"age\":97}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+ break;
+ }
+ runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @EnumSource
+ void testNestedArraysSerialization(JsonSerializationType type) throws
Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn(
+ "nested_arr",
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericArrayData(
+ new Object[] {
+ new GenericArrayData(
+ new Object[] {new
BinaryStringData("Alice")}),
+ new GenericArrayData(
+ new Object[] {new
BinaryStringData("One")}),
+ new GenericArrayData(
+ new Object[] {new
BinaryStringData("Alfa")})
+ })
+ });
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData),
+ DataChangeEvent.updateEvent(table1, recordData,
recordData),
+ DataChangeEvent.deleteEvent(table1, recordData));
+
+ List<String> expectedOutput = null;
+ switch (type) {
+ case DEBEZIUM_JSON:
+ expectedOutput =
+ List.of(
+
"{\"before\":null,\"after\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"after\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+ break;
+ case CANAL_JSON:
+ expectedOutput =
+ List.of(
+
"{\"old\":null,\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":null,\"data\":[{\"id\":1,\"nested_arr\":[[\"Alice\"],[\"One\"],[\"Alfa\"]]}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+ break;
+ }
+ runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @EnumSource
+ void testMapWithArrayValueSerialization(JsonSerializationType type) throws
Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn(
+ "map_arr",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericMapData(
+ Map.of(
+ new BinaryStringData("Alice"),
+ new GenericArrayData(new int[] {1,
2, 3, 4, 5}),
+ new BinaryStringData("Bob"),
+ new GenericArrayData(new int[] {1,
2, 3}),
+ new BinaryStringData("Carol"),
+ new GenericArrayData(new int[] {6,
7, 8, 9, 10})))
+ });
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData),
+ DataChangeEvent.updateEvent(table1, recordData,
recordData),
+ DataChangeEvent.deleteEvent(table1, recordData));
+
+ List<String> expectedOutput = null;
+ switch (type) {
+ case DEBEZIUM_JSON:
+ expectedOutput =
+ List.of(
+
"{\"before\":null,\"after\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"after\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+ break;
+ case CANAL_JSON:
+ expectedOutput =
+ List.of(
+
"{\"old\":null,\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":null,\"data\":[{\"id\":1,\"map_arr\":{\"Alice\":[1,2,3,4,5],\"Bob\":[1,2,3],\"Carol\":[6,7,8,9,10]}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+ break;
+ }
+ runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @EnumSource
+ void testNullAndEmptyComplexTypesSerialization(JsonSerializationType type)
throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ BinaryRecordData recordData1 = generator.generate(new Object[] {1,
null, null});
+ BinaryRecordData recordData2 =
+ generator.generate(
+ new Object[] {
+ 1, new GenericArrayData(new Object[] {}), new
GenericMapData(Map.of())
+ });
+ Map<StringData, Integer> partialEmptyMap = new HashMap<>();
+ partialEmptyMap.put(BinaryStringData.fromString("Alice"), 1);
+ partialEmptyMap.put(BinaryStringData.fromString("Bob"), null);
+ BinaryRecordData recordData3 =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericArrayData(
+ new Object[]
{BinaryStringData.fromString("Foo"), null}),
+ new GenericMapData(partialEmptyMap)
+ });
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData1),
+ DataChangeEvent.updateEvent(table1, recordData1,
recordData2),
+ DataChangeEvent.updateEvent(table1, recordData2,
recordData3),
+ DataChangeEvent.deleteEvent(table1, recordData3));
+
+ List<String> expectedOutput = null;
+ switch (type) {
+ case DEBEZIUM_JSON:
+ expectedOutput =
+ List.of(
+
"{\"before\":null,\"after\":{\"id\":1,\"arr\":null,\"map\":null},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"arr\":null,\"map\":null},\"after\":{\"id\":1,\"arr\":[],\"map\":{}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"arr\":[],\"map\":{}},\"after\":{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+ break;
+ case CANAL_JSON:
+ expectedOutput =
+ List.of(
+
"{\"old\":null,\"data\":[{\"id\":1,\"arr\":null,\"map\":null}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"arr\":null,\"map\":null}],\"data\":[{\"id\":1,\"arr\":[],\"map\":{}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"arr\":[],\"map\":{}}],\"data\":[{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":null,\"data\":[{\"id\":1,\"arr\":[\"Foo\",null],\"map\":{\"Alice\":1,\"Bob\":null}}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+ break;
+ }
+ runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @EnumSource
+ void testDeepNestedStructureSerialization(JsonSerializationType type)
throws Exception {
+ RowType innerRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("x", DataTypes.INT()),
+ DataTypes.FIELD("y", DataTypes.INT()));
+ RowType outerRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("nested", innerRowType));
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("deep", DataTypes.ARRAY(outerRowType))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+ BinaryRecordDataGenerator outerRowGenerator =
+ new BinaryRecordDataGenerator(
+ outerRowType.getFieldTypes().toArray(new DataType[0]));
+ BinaryRecordDataGenerator innerRowGenerator =
+ new BinaryRecordDataGenerator(
+ innerRowType.getFieldTypes().toArray(new DataType[0]));
+
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericArrayData(
+ new Object[] {
+ outerRowGenerator.generate(
+ new Object[] {
+
BinaryStringData.fromString("323"),
+ innerRowGenerator.generate(
+ new Object[] {17,
19})
+ }),
+ outerRowGenerator.generate(
+ new Object[] {
+
BinaryStringData.fromString("143"),
+ innerRowGenerator.generate(
+ new Object[] {11,
13})
+ })
+ })
+ });
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData),
+ DataChangeEvent.updateEvent(table1, recordData,
recordData),
+ DataChangeEvent.deleteEvent(table1, recordData));
+
+ List<String> expectedOutput = null;
+ switch (type) {
+ case DEBEZIUM_JSON:
+ expectedOutput =
+ List.of(
+
"{\"before\":null,\"after\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"after\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
+
"{\"before\":{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}");
+ break;
+ case CANAL_JSON:
+ expectedOutput =
+ List.of(
+
"{\"old\":null,\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}",
+
"{\"old\":null,\"data\":[{\"id\":1,\"deep\":[{\"name\":\"323\",\"nested\":{\"x\":17,\"y\":19}},{\"name\":\"143\",\"nested\":{\"x\":11,\"y\":13}}]}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"id\"]}");
+ break;
+ }
+ runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
+ }
}