This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 418759d1db [Improve][Formats] Support not primary-key table for
debezium format (#7836)
418759d1db is described below
commit 418759d1dbd479b31ade3daee0fc4c1363fc02ba
Author: hailin0 <[email protected]>
AuthorDate: Wed Oct 16 23:30:34 2024 +0800
[Improve][Formats] Support not primary-key table for debezium format (#7836)
---
.../CompatibleDebeziumJsonSerializationSchema.java | 5 ++++
...CompatibleDebeziumJsonSerializationSchema.java} | 27 +++++++++-------------
2 files changed, 16 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
index 4d692663fe..b4b4f47cf5 100644
---
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
@@ -30,15 +30,20 @@ import static
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDeb
@RequiredArgsConstructor
public class CompatibleDebeziumJsonSerializationSchema implements
SerializationSchema {
+ private final boolean isKey;
private final int index;
public CompatibleDebeziumJsonSerializationSchema(SeaTunnelRowType rowType,
boolean isKey) {
+ this.isKey = isKey;
this.index = rowType.indexOf(isKey ? FIELD_KEY : FIELD_VALUE);
}
@Override
public byte[] serialize(SeaTunnelRow row) {
String field = (String) row.getField(index);
+ if (isKey && field == null) {
+ return null;
+ }
return field.getBytes();
}
}
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
similarity index 55%
copy from
seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
copy to
seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
index 4d692663fe..35117080f0 100644
---
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonSerializationSchema.java
@@ -18,27 +18,22 @@
package org.apache.seatunnel.format.compatible.debezium.json;
-import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import lombok.RequiredArgsConstructor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-import static
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_KEY;
-import static
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema.FIELD_VALUE;
+public class TestCompatibleDebeziumJsonSerializationSchema {
-@RequiredArgsConstructor
-public class CompatibleDebeziumJsonSerializationSchema implements
SerializationSchema {
+ @Test
+ public void testDebeziumSerializeKeyIsNull() {
+ SeaTunnelRowType rowType =
+
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE;
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {"test_topic", null,
"value"});
- private final int index;
-
- public CompatibleDebeziumJsonSerializationSchema(SeaTunnelRowType rowType,
boolean isKey) {
- this.index = rowType.indexOf(isKey ? FIELD_KEY : FIELD_VALUE);
- }
-
- @Override
- public byte[] serialize(SeaTunnelRow row) {
- String field = (String) row.getField(index);
- return field.getBytes();
+ CompatibleDebeziumJsonSerializationSchema serializationSchema =
+ new CompatibleDebeziumJsonSerializationSchema(rowType, true);
+ Assertions.assertNull(serializationSchema.serialize(row));
}
}