This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 701fdaf890 [Improve][Formats][Json] Remove assert key word. (#5919)
701fdaf890 is described below

commit 701fdaf89068ae662feb757e195228d70674fec1
Author: Chengyu Yan <[email protected]>
AuthorDate: Sat Dec 2 11:57:49 2023 +0800

    [Improve][Formats][Json] Remove assert key word. (#5919)
---
 release-note.md                                    |   4 +
 .../seatunnel/common/exception/CommonError.java    |  19 +++
 .../common/exception/CommonErrorCode.java          |   2 +
 .../exception/SeaTunnelRuntimeException.java       |   9 ++
 .../json/canal/CanalJsonDeserializationSchema.java | 156 ++++++++++----------
 .../json/ogg/OggJsonDeserializationSchema.java     | 160 +++++++++++----------
 .../json/canal/CanalJsonSerDeSchemaTest.java       |  85 +++++++++++
 .../format/json/ogg/OggJsonSerDeSchemaTest.java    |  91 +++++++++++-
 8 files changed, 370 insertions(+), 156 deletions(-)

diff --git a/release-note.md b/release-note.md
index 9282dd97b7..147823065f 100644
--- a/release-note.md
+++ b/release-note.md
@@ -101,6 +101,10 @@
 - [Core] [Starter] Add check of sink and source config to avoid null pointer 
exception. (#4734)
 - [Core] [Flink] Remove useless stage type related codes. (#5650)
 
+### Formats
+
+- [Json] Remove assert key word. (#5919)
+
 ### Connector-V2
 
 - [Connector-V2] [CDC] Improve startup.mode/stop.mode options (#4360)
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 74d7c0efd3..a62247f3d9 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -31,6 +31,7 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_S
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
 
 /**
@@ -118,4 +119,22 @@ public class CommonError {
         return new SeaTunnelRuntimeException(
                 GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
     }
+
+    public static SeaTunnelRuntimeException jsonOperationError(String format, 
String payload) {
+        return jsonOperationError(format, payload, null);
+    }
+
+    public static SeaTunnelRuntimeException jsonOperationError(
+            String format, String payload, Throwable cause) {
+        Map<String, String> params = new HashMap<>();
+        params.put("format", format);
+        params.put("payload", payload);
+        SeaTunnelErrorCode code = JSON_OPERATION_FAILED;
+
+        if (cause != null) {
+            return new SeaTunnelRuntimeException(code, params, cause);
+        } else {
+            return new SeaTunnelRuntimeException(code, params);
+        }
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index a0817a4e72..1fe001c07b 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.common.exception;
 
 /** SeaTunnel connector error code interface */
 public enum CommonErrorCode implements SeaTunnelErrorCode {
+    JSON_OPERATION_FAILED("COMMON-02", "<format> JSON convert/parse 
'<payload>' operation failed."),
+
     UNSUPPORTED_DATA_TYPE(
             "COMMON-07", "'<identifier>' unsupported data type '<dataType>' of 
'<field>'"),
     CONVERT_TO_SEATUNNEL_TYPE_ERROR(
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
index 4f6f021522..65a421bcd9 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
@@ -64,6 +64,15 @@ public class SeaTunnelRuntimeException extends 
RuntimeException {
         this.params = params;
     }
 
+    public SeaTunnelRuntimeException(
+            SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params, 
Throwable cause) {
+        super(
+                
ExceptionParamsUtil.getDescription(seaTunnelErrorCode.getErrorMessage(), 
params),
+                cause);
+        this.seaTunnelErrorCode = seaTunnelErrorCode;
+        this.params = params;
+    }
+
     public SeaTunnelErrorCode getSeaTunnelErrorCode() {
         return seaTunnelErrorCode;
     }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index a1b3f74dce..96482291ec 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -28,9 +28,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import java.io.IOException;
 import java.util.regex.Pattern;
@@ -38,9 +38,10 @@ import java.util.regex.Pattern;
 import static java.lang.String.format;
 
 public class CanalJsonDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
-
     private static final long serialVersionUID = 1L;
 
+    private static final String FORMAT = "Canal";
+
     private static final String FIELD_OLD = "old";
 
     private static final String FIELD_DATA = "data";
@@ -105,7 +106,8 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
 
     @Override
     public SeaTunnelRow deserialize(byte[] message) throws IOException {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<SeaTunnelRow>) instead.");
     }
 
     @Override
@@ -113,93 +115,93 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
         return this.physicalRowType;
     }
 
-    @Override
-    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
-        if (message == null) {
-            return;
-        }
-        ObjectNode jsonNode = (ObjectNode) convertBytes(message);
-        assert jsonNode != null;
-        deserialize(jsonNode, out);
-    }
-
-    public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
-        if (database != null
-                && 
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
-            return;
-        }
-        if (table != null && 
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
-            return;
-        }
-        JsonNode dataNode = jsonNode.get(FIELD_DATA);
-        String type = jsonNode.get(FIELD_TYPE).asText();
-        // When a null value is encountered, an exception needs to be thrown 
for easy sensing
-        if (dataNode == null || dataNode.isNull()) {
-            // We'll skip the query or create or alter event data
-            if (OP_QUERY.equals(type) || OP_CREATE.equals(type) || 
OP_ALTER.equals(type)) {
+    public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) 
throws IOException {
+        try {
+            if (database != null
+                    && 
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
                 return;
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    format("Null data value \"%s\" Cannot send downstream", 
jsonNode));
-        }
-        if (OP_INSERT.equals(type)) {
-            for (int i = 0; i < dataNode.size(); i++) {
-                SeaTunnelRow row = convertJsonNode(dataNode.get(i));
-                out.collect(row);
+            if (table != null
+                    && 
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+                return;
             }
-        } else if (OP_UPDATE.equals(type)) {
-            final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
-            for (int i = 0; i < dataNode.size(); i++) {
-                SeaTunnelRow after = convertJsonNode(dataNode.get(i));
-                SeaTunnelRow before = convertJsonNode(oldNode.get(i));
-                for (int f = 0; f < fieldCount; f++) {
-                    assert before != null;
-                    if (before.isNullAt(f) && oldNode.findValue(fieldNames[f]) 
== null) {
-                        // fields in "old" (before) means the fields are 
changed
-                        // fields not in "old" (before) means the fields are 
not changed
-                        // so we just copy the not changed fields into before
-                        assert after != null;
-                        before.setField(f, after.getField(f));
-                    }
+
+            JsonNode dataNode = jsonNode.get(FIELD_DATA);
+            String type = jsonNode.get(FIELD_TYPE).asText();
+            // When a null value is encountered, an exception needs to be 
thrown for easy sensing
+            if (dataNode == null || dataNode.isNull()) {
+                // We'll skip the query or create or alter event data
+                if (OP_QUERY.equals(type) || OP_CREATE.equals(type) || 
OP_ALTER.equals(type)) {
+                    return;
                 }
-                assert before != null;
-                before.setRowKind(RowKind.UPDATE_BEFORE);
-                assert after != null;
-                after.setRowKind(RowKind.UPDATE_AFTER);
-                out.collect(before);
-                out.collect(after);
+                throw new IllegalStateException(
+                        format("Null data value '%s' Cannot send downstream", 
jsonNode));
             }
-        } else if (OP_DELETE.equals(type)) {
-            for (int i = 0; i < dataNode.size(); i++) {
-                SeaTunnelRow row = convertJsonNode(dataNode.get(i));
-                assert row != null;
-                row.setRowKind(RowKind.DELETE);
-                out.collect(row);
+            if (OP_INSERT.equals(type)) {
+                for (int i = 0; i < dataNode.size(); i++) {
+                    SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+                    out.collect(row);
+                }
+            } else if (OP_UPDATE.equals(type)) {
+                final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
+                for (int i = 0; i < dataNode.size(); i++) {
+                    SeaTunnelRow after = convertJsonNode(dataNode.get(i));
+                    SeaTunnelRow before = convertJsonNode(oldNode.get(i));
+                    for (int f = 0; f < fieldCount; f++) {
+                        if (before.isNullAt(f) && 
oldNode.findValue(fieldNames[f]) == null) {
+                            // fields in "old" (before) means the fields are 
changed
+                            // fields not in "old" (before) means the fields 
are not changed
+                            // so we just copy the not changed fields into 
before
+                            before.setField(f, after.getField(f));
+                        }
+                    }
+                    before.setRowKind(RowKind.UPDATE_BEFORE);
+                    after.setRowKind(RowKind.UPDATE_AFTER);
+                    out.collect(before);
+                    out.collect(after);
+                }
+            } else if (OP_DELETE.equals(type)) {
+                for (int i = 0; i < dataNode.size(); i++) {
+                    SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+                    row.setRowKind(RowKind.DELETE);
+                    out.collect(row);
+                }
+            } else {
+                throw new IllegalStateException(format("Unknown operation type 
'%s'.", type));
             }
-        } else {
+        } catch (Throwable t) {
             if (!ignoreParseErrors) {
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                        format(
-                                "Unknown \"type\" value \"%s\". The Canal JSON 
message is '%s'",
-                                type, jsonNode.asText()));
+                throw CommonError.jsonOperationError(FORMAT, 
jsonNode.toString(), t);
             }
         }
     }
 
-    private JsonNode convertBytes(byte[] message) {
+    private ObjectNode convertBytes(byte[] message) throws 
SeaTunnelRuntimeException {
         try {
-            return jsonDeserializer.deserializeToJsonNode(message);
-        } catch (Exception t) {
-            if (ignoreParseErrors) {
-                return null;
+            return (ObjectNode) 
jsonDeserializer.deserializeToJsonNode(message);
+        } catch (Throwable t) {
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
+        }
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) 
throws IOException {
+        if (message == null) {
+            return;
+        }
+
+        ObjectNode jsonNode;
+        try {
+            jsonNode = convertBytes(message);
+        } catch (SeaTunnelRuntimeException cause) {
+            if (!ignoreParseErrors) {
+                throw cause;
+            } else {
+                return;
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
-                    t);
         }
+
+        deserialize(jsonNode, out);
     }
 
     private SeaTunnelRow convertJsonNode(JsonNode root) {
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
index dc78566bbe..36e6e541c4 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
@@ -27,9 +27,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import java.io.IOException;
 import java.util.regex.Pattern;
@@ -40,6 +40,8 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
 
     private static final long serialVersionUID = 1L;
 
+    private static final String FORMAT = "Ogg";
+
     private static final String FIELD_TYPE = "op_type";
 
     private static final String FIELD_DATABASE_TABLE = "table";
@@ -55,7 +57,7 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
     private static final String OP_DELETE = "D"; // DELETE
 
     private static final String REPLICA_IDENTITY_EXCEPTION =
-            "The \"before\" field of %s message is null, "
+            "The \"before\" field of %s operation message is null, "
                     + "if you are using Ogg Postgres Connector, "
                     + "please check the Postgres table has been set REPLICA 
IDENTITY to FULL level.";
 
@@ -101,9 +103,8 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
 
     @Override
     public SeaTunnelRow deserialize(byte[] message) throws IOException {
-        throw new SeaTunnelJsonFormatException(
-                CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                String.format("Failed to deserialize JSON '%s'.", new 
String(message)));
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<SeaTunnelRow>) instead.");
     }
 
     @Override
@@ -111,91 +112,94 @@ public class OggJsonDeserializationSchema implements 
DeserializationSchema<SeaTu
         return this.physicalRowType;
     }
 
-    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
-        if (message == null || message.length == 0) {
-            // skip tombstone messages
-            return;
+    private ObjectNode convertBytes(byte[] message) throws 
SeaTunnelRuntimeException {
+        try {
+            return (ObjectNode) 
jsonDeserializer.deserializeToJsonNode(message);
+        } catch (Throwable t) {
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
         }
-        ObjectNode jsonNode = (ObjectNode) convertBytes(message);
-        assert jsonNode != null;
+    }
 
-        if (database != null
-                && !databasePattern
-                        
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
-                        .matches()) {
-            return;
-        }
-        if (table != null
-                && !tablePattern
-                        
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
-                        .matches()) {
-            return;
-        }
-        String op = jsonNode.get(FIELD_TYPE).asText().trim();
-        if (OP_INSERT.equals(op)) {
-            // Gets the data for the INSERT operation
-            JsonNode dataBefore = jsonNode.get(DATA_AFTER);
-            SeaTunnelRow row = convertJsonNode(dataBefore);
-            out.collect(row);
-        } else if (OP_UPDATE.equals(op)) {
-            JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
-            // Modify Operation Data cannot be empty before modification
-            if (dataBefore == null || dataBefore.isNull()) {
-                throw new IllegalStateException(
-                        String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+    public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) 
throws IOException {
+        try {
+            if (database != null
+                    && !databasePattern
+                            
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
+                            .matches()) {
+                return;
             }
-            JsonNode dataAfter = jsonNode.get(DATA_AFTER);
-            // Gets the data for the UPDATE BEFORE operation
-            SeaTunnelRow before = convertJsonNode(dataBefore);
-            // Gets the data for the UPDATE AFTER operation
-            SeaTunnelRow after = convertJsonNode(dataAfter);
-            assert before != null;
-            before.setRowKind(RowKind.UPDATE_BEFORE);
-            out.collect(before);
-            assert after != null;
-            after.setRowKind(RowKind.UPDATE_AFTER);
-            out.collect(after);
-
-        } else if (OP_DELETE.equals(op)) {
-            JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
-            if (dataBefore == null || dataBefore.isNull()) {
-                throw new IllegalStateException(
-                        String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+            if (table != null
+                    && !tablePattern
+                            
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
+                            .matches()) {
+                return;
             }
-            // Gets the data for the DELETE BEFORE operation
-            SeaTunnelRow before = convertJsonNode(dataBefore);
-            if (before == null) {
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                        format(
-                                "The data %s the %s cannot be null \"%s\" ",
-                                "BEFORE", "DELETE", new String(message)));
+
+            String op = jsonNode.get(FIELD_TYPE).asText().trim();
+            if (OP_INSERT.equals(op)) {
+                // Gets the data for the INSERT operation
+                JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+                SeaTunnelRow row = convertJsonNode(dataAfter);
+                out.collect(row);
+            } else if (OP_UPDATE.equals(op)) {
+                JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+                // Modify Operation Data cannot be empty before modification
+                if (dataBefore == null || dataBefore.isNull()) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
+                }
+                JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+                // Gets the data for the UPDATE BEFORE operation
+                SeaTunnelRow before = convertJsonNode(dataBefore);
+                // Gets the data for the UPDATE AFTER operation
+                SeaTunnelRow after = convertJsonNode(dataAfter);
+
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                out.collect(before);
+
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                out.collect(after);
+            } else if (OP_DELETE.equals(op)) {
+                JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+                if (dataBefore == null || dataBefore.isNull()) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"DELETE"));
+                }
+                // Gets the data for the DELETE BEFORE operation
+                SeaTunnelRow before = convertJsonNode(dataBefore);
+                if (before == null) {
+                    throw new IllegalStateException(
+                            String.format(REPLICA_IDENTITY_EXCEPTION, 
"DELETE"));
+                }
+                before.setRowKind(RowKind.DELETE);
+                out.collect(before);
+            } else {
+                throw new IllegalStateException(format("Unknown operation type 
'%s'.", op));
             }
-            before.setRowKind(RowKind.DELETE);
-            out.collect(before);
-        } else {
+        } catch (Throwable t) {
             if (!ignoreParseErrors) {
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                        format(
-                                "Unknown \"type\" value \"%s\". The Canal JSON 
message is '%s'",
-                                op, new String(message)));
+                throw CommonError.jsonOperationError(FORMAT, 
jsonNode.toString(), t);
             }
         }
     }
 
-    private JsonNode convertBytes(byte[] message) {
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) 
throws IOException {
+        if (message == null || message.length == 0) {
+            // skip tombstone messages
+            return;
+        }
+
+        ObjectNode jsonNode;
         try {
-            return jsonDeserializer.deserializeToJsonNode(message);
-        } catch (Exception t) {
-            if (ignoreParseErrors) {
-                return null;
+            jsonNode = convertBytes(message);
+        } catch (Throwable cause) {
+            if (!ignoreParseErrors) {
+                throw cause;
+            } else {
+                return;
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
-                    t);
         }
+        deserialize(jsonNode, out);
     }
 
     private SeaTunnelRow convertJsonNode(JsonNode root) {
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index c7544f6db8..601e9c738c 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.junit.jupiter.api.Test;
 
@@ -40,8 +42,10 @@ import static 
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class CanalJsonSerDeSchemaTest {
+    private static final String FORMAT = "Canal";
 
     private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
             new SeaTunnelRowType(
@@ -69,6 +73,87 @@ public class CanalJsonSerDeSchemaTest {
         assertEquals(0, collector.list.size());
     }
 
+    @Test
+    public void testDeserializeNoJson() throws Exception {
+        final CanalJsonDeserializationSchema deserializationSchema =
+                createCanalJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String noJsonMsg = "{]";
+
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeEmptyJson() throws Exception {
+        final CanalJsonDeserializationSchema deserializationSchema =
+                createCanalJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String emptyMsg = "{}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, emptyMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeNoDataJson() throws Exception {
+        final CanalJsonDeserializationSchema deserializationSchema =
+                createCanalJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String noDataMsg = "{\"type\":\"INSERT\"}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noDataMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable noDataCause = cause.getCause();
+        assertEquals(noDataCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                noDataCause.getMessage(),
+                String.format("Null data value '%s' Cannot send downstream", 
noDataMsg));
+    }
+
+    @Test
+    public void testDeserializeUnknownTypeJson() throws Exception {
+        final CanalJsonDeserializationSchema deserializationSchema =
+                createCanalJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String unknownType = "XX";
+        String unknownOperationMsg =
+                "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"type\":\"" + 
unknownType + "\"}";
+        SeaTunnelRuntimeException expected =
+                CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            deserializationSchema.deserialize(
+                                    unknownOperationMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable unknownTypeCause = cause.getCause();
+        assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                unknownTypeCause.getMessage(),
+                String.format("Unknown operation type '%s'.", unknownType));
+    }
+
     public void runTest(List<String> lines, CanalJsonDeserializationSchema 
deserializationSchema)
             throws IOException {
         SimpleCollector collector = new SimpleCollector();
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
index a08927737f..1f44c7cfbc 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -41,8 +43,10 @@ import static 
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class OggJsonSerDeSchemaTest {
+    private static final String FORMAT = "Ogg";
 
     private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
             new SeaTunnelRowType(
@@ -66,10 +70,95 @@ public class OggJsonSerDeSchemaTest {
                 createOggJsonDeserializationSchema(null, null);
         final SimpleCollector collector = new SimpleCollector();
 
-        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize((byte[]) null, collector);
         assertEquals(0, collector.list.size());
     }
 
+    @Test
+    public void testDeserializeNoJson() throws Exception {
+        final OggJsonDeserializationSchema deserializationSchema =
+                createOggJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String noJsonMsg = "{]";
+
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeEmptyJson() throws Exception {
+        final OggJsonDeserializationSchema deserializationSchema =
+                createOggJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String emptyMsg = "{}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, emptyMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeNoDataJson() throws Exception {
+        final OggJsonDeserializationSchema deserializationSchema =
+                createOggJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String noDataMsg = "{\"op_type\":\"U\"}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noDataMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable noDataCause = cause.getCause();
+        assertEquals(noDataCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                noDataCause.getMessage(),
+                String.format(
+                        "The \"before\" field of %s operation message is null, 
"
+                                + "if you are using Ogg Postgres Connector, "
+                                + "please check the Postgres table has been 
set REPLICA IDENTITY to FULL level.",
+                        "UPDATE"));
+    }
+
+    @Test
+    public void testDeserializeUnknownTypeJson() throws Exception {
+        final OggJsonDeserializationSchema deserializationSchema =
+                createOggJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+        String unknownType = "XX";
+        String unknownOperationMsg =
+                "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"op_type\":\"" + 
unknownType + "\"}";
+        SeaTunnelRuntimeException expected =
+                CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            deserializationSchema.deserialize(
+                                    unknownOperationMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable unknownTypeCause = cause.getCause();
+        assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                unknownTypeCause.getMessage(),
+                String.format("Unknown operation type '%s'.", unknownType));
+    }
+
     public void runTest(List<String> lines, OggJsonDeserializationSchema 
deserializationSchema)
             throws IOException {
         SimpleCollector collector = new SimpleCollector();


Reply via email to