This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 28985db06 [FLINK-36877][pipeline-connector/kafka] Correct canal-json output for delete record 28985db06 is described below commit 28985db062dbeccf5b71f4165bbd580ac3fd0749 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Jan 9 12:04:57 2025 +0800 [FLINK-36877][pipeline-connector/kafka] Correct canal-json output for delete record This closes #3788 --- .../cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java | 4 ++-- .../connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java | 2 +- .../apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index d0c617975..a55bcf802 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -163,8 +163,9 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> .getSerializationSchema() .serialize(reuseGenericRowData); case DELETE: + reuseGenericRowData.setField(0, null); reuseGenericRowData.setField( - 0, + 1, new GenericArrayData( new RowData[] { jsonSerializers @@ -172,7 +173,6 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event> .getRowDataFromRecordData( dataChangeEvent.before(), false) })); - reuseGenericRowData.setField(1, null); reuseGenericRowData.setField(2, OP_DELETE); return jsonSerializers .get(dataChangeEvent.tableId()) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index 362354c6e..136762982 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -111,7 +111,7 @@ public class CanalJsonSerializationSchemaTest { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index 7936c0ef3..ddce0a9e4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -356,7 +356,7 @@ class KafkaDataSinkITCase extends TestLogger { table1.getTableName())), mapper.readTree( String.format( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", table1.getTableName())), mapper.readTree( String.format( @@ -449,7 +449,7 @@ class KafkaDataSinkITCase extends TestLogger { table1.toString())), mapper.readTree( String.format( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})", + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})", table1.getTableName()))), Tuple2.of( mapper.readTree(