This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 28985db06 [FLINK-36877][pipeline-connector/kafka] Correct canal-json 
output for delete record
28985db06 is described below

commit 28985db062dbeccf5b71f4165bbd580ac3fd0749
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Jan 9 12:04:57 2025 +0800

    [FLINK-36877][pipeline-connector/kafka] Correct canal-json output for 
delete record
    
    This closes #3788
---
 .../cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java | 4 ++--
 .../connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java | 2 +-
 .../apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java   | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index d0c617975..a55bcf802 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -163,8 +163,9 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                             .getSerializationSchema()
                             .serialize(reuseGenericRowData);
                 case DELETE:
+                    reuseGenericRowData.setField(0, null);
                     reuseGenericRowData.setField(
-                            0,
+                            1,
                             new GenericArrayData(
                                     new RowData[] {
                                         jsonSerializers
@@ -172,7 +173,6 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<Event>
                                                 .getRowDataFromRecordData(
                                                         
dataChangeEvent.before(), false)
                                     }));
-                    reuseGenericRowData.setField(1, null);
                     reuseGenericRowData.setField(2, OP_DELETE);
                     return jsonSerializers
                             .get(dataChangeEvent.tableId())
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
index 362354c6e..136762982 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java
@@ -111,7 +111,7 @@ public class CanalJsonSerializationSchemaTest {
                                 }));
         expected =
                 mapper.readTree(
-                        
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
+                        
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
         actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
         Assertions.assertEquals(expected, actual);
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 7936c0ef3..ddce0a9e4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -356,7 +356,7 @@ class KafkaDataSinkITCase extends TestLogger {
                                         table1.getTableName())),
                         mapper.readTree(
                                 String.format(
-                                        
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+                                        
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
                                         table1.getTableName())),
                         mapper.readTree(
                                 String.format(
@@ -449,7 +449,7 @@ class KafkaDataSinkITCase extends TestLogger {
                                                 table1.toString())),
                                 mapper.readTree(
                                         String.format(
-                                                
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+                                                
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
                                                 table1.getTableName()))),
                         Tuple2.of(
                                 mapper.readTree(

Reply via email to