linguoxuan commented on code in PR #4240:
URL: https://github.com/apache/flink-cdc/pull/4240#discussion_r2749284314


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java:
##########
@@ -340,4 +340,139 @@ void testSerializeComplexTypes() throws Exception {
         assertThat(rowNode.has("f1")).isTrue();
         assertThat(rowNode.has("f2")).isTrue();
     }
+
+    @Test
+    void testSerializeWithSchemaComplexTypes() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("include-schema.enabled", "true");
+        Configuration configuration = Configuration.fromMap(properties);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        configuration, JsonSerializationType.DEBEZIUM_JSON, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+
+        // create table with complex types
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn(
+                                "row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.INT())))
+                        .primaryKey("id")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        DataTypes.ROW(
+                                DataTypes.FIELD("f1", DataTypes.STRING()),
+                                DataTypes.FIELD("f2", DataTypes.INT())));
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        assertThat(serializationSchema.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+
+        // Create test data with complex types
+        org.apache.flink.cdc.common.data.GenericArrayData arrayData =
+                new org.apache.flink.cdc.common.data.GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("item1"),
+                            BinaryStringData.fromString("item2")
+                        });
+
+        Map<Object, Object> mapValues = new HashMap<>();
+        mapValues.put(BinaryStringData.fromString("key1"), 100);
+        mapValues.put(BinaryStringData.fromString("key2"), 200);
+        org.apache.flink.cdc.common.data.GenericMapData mapData =
+                new org.apache.flink.cdc.common.data.GenericMapData(mapValues);
+
+        BinaryRecordDataGenerator nestedRowGenerator =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), 
DataTypes.INT()));
+        org.apache.flink.cdc.common.data.RecordData nestedRow =
+                nestedRowGenerator.generate(
+                        new Object[] {BinaryStringData.fromString("nested"), 
42});
+
+        // insert event with complex types
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(new Object[] {1, arrayData, 
mapData, nestedRow}));
+
+        byte[] serialized = serializationSchema.serialize(insertEvent);
+        JsonNode actual = mapper.readTree(serialized);
+
+        // Verify the schema is present
+        assertThat(actual.has("schema")).isTrue();
+        assertThat(actual.has("payload")).isTrue();
+
+        JsonNode schemaNode = actual.get("schema");
+        assertThat(schemaNode.has("fields")).isTrue();
+
+        // Get before/after schema fields
+        JsonNode fields = schemaNode.get("fields");
+        JsonNode afterSchema = null;
+        for (JsonNode field : fields) {
+            if ("after".equals(field.get("field").asText())) {
+                afterSchema = field;
+                break;
+            }
+        }
+        assertThat(afterSchema).isNotNull();
+
+        // Verify schema contains complex type definitions
+        JsonNode afterFields = afterSchema.get("fields");
+        assertThat(afterFields).isNotNull();
+
+        // Find and verify array schema
+        JsonNode arrSchema = null;
+        JsonNode mapSchema = null;
+        JsonNode rowSchema = null;
+        for (JsonNode f : afterFields) {
+            String fieldName = f.get("field").asText();
+            if ("arr".equals(fieldName)) {
+                arrSchema = f;
+            } else if ("map".equals(fieldName)) {
+                mapSchema = f;
+            } else if ("row".equals(fieldName)) {
+                rowSchema = f;
+            }
+        }
+
+        // Verify array schema type
+        assertThat(arrSchema).isNotNull();
+        assertThat(arrSchema.get("type").asText()).isEqualTo("array");
+        assertThat(arrSchema.has("items")).isTrue();
+        
assertThat(arrSchema.get("items").get("type").asText()).isEqualTo("string");
+
+        // Verify map schema type
+        assertThat(mapSchema).isNotNull();
+        assertThat(mapSchema.get("type").asText()).isEqualTo("map");
+        assertThat(mapSchema.has("keys")).isTrue();
+        assertThat(mapSchema.has("values")).isTrue();
+        
assertThat(mapSchema.get("keys").get("type").asText()).isEqualTo("string");
+        
assertThat(mapSchema.get("values").get("type").asText()).isEqualTo("int32");
+
+        // Verify row schema type (struct)
+        assertThat(rowSchema).isNotNull();
+        assertThat(rowSchema.get("type").asText()).isEqualTo("struct");
+        assertThat(rowSchema.has("fields")).isTrue();
+
+        // Verify payload data
+        JsonNode payload = actual.get("payload");
+        assertThat(payload.has("after")).isTrue();
+        JsonNode afterData = payload.get("after");
+        assertThat(afterData.has("arr")).isTrue();
+        assertThat(afterData.has("map")).isTrue();
+        assertThat(afterData.has("row")).isTrue();

Review Comment:
   Thanks for the review. I have push a new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to