This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 418759d1db [Improve][Formats] Support not primary-key table for 
debezium format (#7836)
418759d1db is described below

commit 418759d1dbd479b31ade3daee0fc4c1363fc02ba
Author: hailin0 <[email protected]>
AuthorDate: Wed Oct 16 23:30:34 2024 +0800

    [Improve][Formats] Support not primary-key table for debezium format (#7836)
---
 .../CompatibleDebeziumJsonSerializationSchema.java |  5 ++++
 ...CompatibleDebeziumJsonSerializationSchema.java} | 27 +++++++++-------------
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
index 4d692663fe..b4b4f47cf5 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
@@ -30,15 +30,20 @@ import static 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDeb
 @RequiredArgsConstructor
 public class CompatibleDebeziumJsonSerializationSchema implements 
SerializationSchema {
 
+    private final boolean isKey;
     private final int index;
 
     public CompatibleDebeziumJsonSerializationSchema(SeaTunnelRowType rowType, 
boolean isKey) {
+        this.isKey = isKey;
         this.index = rowType.indexOf(isKey ? FIELD_KEY : FIELD_VALUE);
     }
 
     @Override
     public byte[] serialize(SeaTunnelRow row) {
         String field = (String) row.getField(index);
+        if (isKey && field == null) {
+            return null;
+        }
         return field.getBytes();
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
similarity index 55%
copy from 
seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
copy to 
seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
index 4d692663fe..35117080f0 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
@@ -18,27 +18,22 @@
 
 package org.apache.seatunnel.format.compatible.debezium.json;
 
-import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
-import lombok.RequiredArgsConstructor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-import static 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_KEY;
-import static 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_VALUE;
+public class TestCompatibleDebeziumJsonSerializationSchema {
 
-@RequiredArgsConstructor
-public class CompatibleDebeziumJsonSerializationSchema implements 
SerializationSchema {
+    @Test
+    public void testDebeziumSerializeKeyIsNull() {
+        SeaTunnelRowType rowType =
+                
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE;
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {"test_topic", null, 
"value"});
 
-    private final int index;
-
-    public CompatibleDebeziumJsonSerializationSchema(SeaTunnelRowType rowType, 
boolean isKey) {
-        this.index = rowType.indexOf(isKey ? FIELD_KEY : FIELD_VALUE);
-    }
-
-    @Override
-    public byte[] serialize(SeaTunnelRow row) {
-        String field = (String) row.getField(index);
-        return field.getBytes();
+        CompatibleDebeziumJsonSerializationSchema serializationSchema =
+                new CompatibleDebeziumJsonSerializationSchema(rowType, true);
+        Assertions.assertNull(serializationSchema.serialize(row));
     }
 }

Reply via email to