This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new e0c08f75d [hotfix][kafka] Fix Debezium schema generation for complex
types (ARRAY, MAP, ROW) (#4240)
e0c08f75d is described below
commit e0c08f75dafcadaf0554075380a2015631076f59
Author: SkylerLin <[email protected]>
AuthorDate: Tue Feb 3 11:28:59 2026 +0800
[hotfix][kafka] Fix Debezium schema generation for complex types (ARRAY,
MAP, ROW) (#4240)
---
.../docs/connectors/pipeline-connectors/kafka.md | 30 ++++++
.../debezium/DebeziumJsonSerializationSchema.java | 57 ++++++++---
.../DebeziumJsonSerializationSchemaTest.java | 97 +++++++++++++++++++
.../connectors/kafka/sink/KafkaDataSinkITCase.java | 106 ++++++++++++++++++++-
4 files changed, 277 insertions(+), 13 deletions(-)
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index b94efc149..2a5cc4ed5 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -285,36 +285,42 @@ Data Type Mapping
<td>TINYINT</td>
<td>INT16</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
<td>INT16</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
<td>INT32</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
<td>INT64</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DECIMAL(p, s)</td>
@@ -328,6 +334,7 @@ Data Type Mapping
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DATE</td>
@@ -355,12 +362,35 @@ Data Type Mapping
<td>CHAR(n)</td>
<td>STRING</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td>STRING</td>
<td></td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>ARRAY</td>
+ <td>ARRAY</td>
+ <td>ARRAY</td>
+ <td></td>
+ <td>Serialized as JSON array. Element types are recursively converted
according to this table.</td>
+ </tr>
+ <tr>
+ <td>MAP</td>
+ <td>MAP</td>
+ <td>MAP</td>
+ <td></td>
+ <td>Serialized as JSON object. Key and value types are recursively
converted according to this table.</td>
+ </tr>
+ <tr>
+ <td>ROW</td>
+ <td>ROW</td>
+ <td>STRUCT</td>
+ <td></td>
+ <td>Serialized as JSON object. Field types are recursively converted
according to this table.</td>
</tr>
</tbody>
</table>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index b1945b5c1..c70b11744 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -25,7 +25,10 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -249,6 +252,24 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column
column) {
org.apache.flink.cdc.common.types.DataType columnType =
column.getType();
+ SchemaBuilder field = convertCDCDataTypeToDebeziumDataType(columnType);
+
+ if (columnType.isNullable()) {
+ field.optional();
+ } else {
+ field.required();
+ }
+ if (column.getDefaultValueExpression() != null) {
+ field.defaultValue(column.getDefaultValueExpression());
+ }
+ if (column.getComment() != null) {
+ field.doc(column.getComment());
+ }
+ return field;
+ }
+
+ private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
+ org.apache.flink.cdc.common.types.DataType columnType) {
final SchemaBuilder field;
switch (columnType.getTypeRoot()) {
case TINYINT:
@@ -310,23 +331,37 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema<Even
.orElse(0)))
.version(1);
break;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) columnType;
+ org.apache.kafka.connect.data.Schema elementSchema =
+
convertCDCDataTypeToDebeziumDataType(arrayType.getElementType()).build();
+ field = SchemaBuilder.array(elementSchema);
+ break;
+ case MAP:
+ MapType mapType = (MapType) columnType;
+ org.apache.kafka.connect.data.Schema keySchema =
+
convertCDCDataTypeToDebeziumDataType(mapType.getKeyType()).build();
+ org.apache.kafka.connect.data.Schema valueSchema =
+
convertCDCDataTypeToDebeziumDataType(mapType.getValueType()).build();
+ field = SchemaBuilder.map(keySchema, valueSchema);
+ break;
+ case ROW:
+ org.apache.flink.cdc.common.types.RowType rowType =
+ (org.apache.flink.cdc.common.types.RowType) columnType;
+ SchemaBuilder structBuilder = SchemaBuilder.struct();
+ for (DataField dataField : rowType.getFields()) {
+ org.apache.kafka.connect.data.Schema fieldSchema =
+
convertCDCDataTypeToDebeziumDataType(dataField.getType()).build();
+ structBuilder.field(dataField.getName(), fieldSchema);
+ }
+ field = structBuilder;
+ break;
case CHAR:
case VARCHAR:
default:
field = SchemaBuilder.string();
}
- if (columnType.isNullable()) {
- field.optional();
- } else {
- field.required();
- }
- if (column.getDefaultValueExpression() != null) {
- field.defaultValue(column.getDefaultValueExpression());
- }
- if (column.getComment() != null) {
- field.doc(column.getComment());
- }
return field;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index 35aada0c2..c2e026f77 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -340,4 +340,101 @@ class DebeziumJsonSerializationSchemaTest {
assertThat(rowNode.has("f1")).isTrue();
assertThat(rowNode.has("f2")).isTrue();
}
+
+ @Test
+ void testSerializeWithSchemaComplexTypes() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("include-schema.enabled", "true");
+ Configuration configuration = Configuration.fromMap(properties);
+ SerializationSchema<Event> serializationSchema =
+ ChangeLogJsonFormatFactory.createSerializationSchema(
+ configuration, JsonSerializationType.DEBEZIUM_JSON,
ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table with complex types
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .physicalColumn(
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("f2", DataTypes.INT())));
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator = new
BinaryRecordDataGenerator(rowType);
+
+ // Create test data with complex types
+ org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+ new org.apache.flink.cdc.common.data.GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("item1"),
+ BinaryStringData.fromString("item2")
+ });
+
+ Map<Object, Object> mapValues = new HashMap<>();
+ mapValues.put(BinaryStringData.fromString("key1"), 100);
+ mapValues.put(BinaryStringData.fromString("key2"), 200);
+ org.apache.flink.cdc.common.data.GenericMapData mapData =
+ new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+ BinaryRecordDataGenerator nestedRowGenerator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(),
DataTypes.INT()));
+ org.apache.flink.cdc.common.data.RecordData nestedRow =
+ nestedRowGenerator.generate(
+ new Object[] {BinaryStringData.fromString("nested"),
42});
+
+ // insert event with complex types
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(new Object[] {1, arrayData,
mapData, nestedRow}));
+
+ byte[] serialized = serializationSchema.serialize(insertEvent);
+ JsonNode actual = mapper.readTree(serialized);
+
+ JsonNode expected =
+ mapper.readTree(
+ "{\"schema\":{\"type\":\"struct\",\"fields\":["
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+ +
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+ +
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+ +
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+ + "],\"optional\":true,\"field\":\"row\"}"
+ + "],\"optional\":true,\"field\":\"before\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+ +
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+ +
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+ +
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+ + "],\"optional\":true,\"field\":\"row\"}"
+ + "],\"optional\":true,\"field\":\"after\"}"
+ + "],\"optional\":false},"
+ +
"\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
+
+ assertThat(actual).isEqualTo(expected);
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index c3bb94fde..8d92b5678 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -85,6 +85,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -710,6 +711,16 @@ class KafkaDataSinkITCase extends TestLogger {
List<Event> eventsToSerialize,
List<String> expectedJson)
throws Exception {
+ runGenericComplexTypeSerializationTest(
+ serializationType, eventsToSerialize, expectedJson, false);
+ }
+
+ void runGenericComplexTypeSerializationTest(
+ JsonSerializationType serializationType,
+ List<Event> eventsToSerialize,
+ List<String> expectedJson,
+ boolean includeSchema)
+ throws Exception {
try (StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
env.enableCheckpointing(1000L);
env.setRestartStrategy(RestartStrategies.noRestart());
@@ -726,6 +737,9 @@ class KafkaDataSinkITCase extends TestLogger {
KafkaDataSinkOptions.VALUE_FORMAT.key(),
JsonSerializationType.CANAL_JSON.toString());
}
+ if (includeSchema) {
+ config.put(DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), "true");
+ }
source.sinkTo(
((FlinkSinkProvider)
(new KafkaDataSinkFactory()
@@ -757,8 +771,9 @@ class KafkaDataSinkITCase extends TestLogger {
}
})
.collect(Collectors.toList());
- assertThat(deserializeValues(collectedRecords))
- .containsExactlyElementsOf(expectedJsonNodes);
+ List<JsonNode> actualJsonNodes = deserializeValues(collectedRecords);
+
+
assertThat(actualJsonNodes).containsExactlyElementsOf(expectedJsonNodes);
checkProducerLeak();
}
@@ -1072,4 +1087,91 @@ class KafkaDataSinkITCase extends TestLogger {
}
runGenericComplexTypeSerializationTest(type, eventsToSerialize,
expectedOutput);
}
+
+ @Test
+ void testDebeziumJsonWithSchemaComplexTypes() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .physicalColumn(
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT())))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+ BinaryRecordDataGenerator nestedRowGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) (schema.getColumn("row").get().getType()))
+ .getFieldTypes()
+ .toArray(new DataType[0]));
+
+ BinaryRecordData recordData =
+ generator.generate(
+ new Object[] {
+ 1,
+ new GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("item1"),
+ BinaryStringData.fromString("item2")
+ }),
+ new GenericMapData(
+ Map.of(
+
BinaryStringData.fromString("key1"), 100,
+
BinaryStringData.fromString("key2"), 200)),
+ nestedRowGenerator.generate(
+ new Object[]
{BinaryStringData.fromString("nested"), 42})
+ });
+
+ List<Event> eventsToSerialize =
+ List.of(
+ new CreateTableEvent(table1, schema),
+ DataChangeEvent.insertEvent(table1, recordData),
+ DataChangeEvent.updateEvent(table1, recordData,
recordData),
+ DataChangeEvent.deleteEvent(table1, recordData));
+
+ String schemaJson =
+ "{\"type\":\"struct\",\"fields\":["
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+ +
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+ +
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+ +
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+ + "],\"optional\":true,\"field\":\"row\"}"
+ + "],\"optional\":true,\"field\":\"before\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
+ +
"{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
+ +
"{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
+ + "{\"type\":\"struct\",\"fields\":["
+ +
"{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
+ +
"{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
+ + "],\"optional\":true,\"field\":\"row\"}"
+ + "],\"optional\":true,\"field\":\"after\"}"
+ + "],\"optional\":false}";
+
+ List<String> expectedJsonWithSchema =
+ List.of(
+ "{\"schema\":"
+ + schemaJson
+ +
",\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
+ "{\"schema\":"
+ + schemaJson
+ +
",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
+ "{\"schema\":"
+ + schemaJson
+ +
",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}");
+
+ runGenericComplexTypeSerializationTest(
+ JsonSerializationType.DEBEZIUM_JSON,
+ eventsToSerialize,
+ expectedJsonWithSchema,
+ true);
+ }
}