This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ebb64f14bd [Improve][Formats] Replace 
`CommonErrorCodeDeprecated.JSON_OPERATION_FAILED` (#5948)
ebb64f14bd is described below

commit ebb64f14bd57ef70c0da15a192c3b6af1d02318d
Author: Chengyu Yan <[email protected]>
AuthorDate: Thu Dec 14 13:53:27 2023 +0800

    [Improve][Formats] Replace 
`CommonErrorCodeDeprecated.JSON_OPERATION_FAILED` (#5948)
---
 release-note.md                                    |   1 +
 ...ompatibleKafkaConnectDeserializationSchema.java |  10 +-
 .../format/json/JsonDeserializationSchema.java     |  18 +--
 .../format/json/JsonSerializationSchema.java       |  11 +-
 .../seatunnel/format/json/JsonToRowConverters.java |  17 ++-
 .../json/canal/CanalJsonSerializationSchema.java   |   8 +-
 .../DebeziumJsonDeserializationSchema.java         |  38 ++-----
 .../debezium/DebeziumJsonSerializationSchema.java  |   9 +-
 .../json/ogg/OggJsonSerializationSchema.java       |   7 +-
 .../format/json/JsonRowDataSerDeSchemaTest.java    | 121 ++++++++++++++++-----
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java |  92 ++++++++++++++++
 11 files changed, 233 insertions(+), 99 deletions(-)

diff --git a/release-note.md b/release-note.md
index acfeb8fe8e..5c6d5738a8 100644
--- a/release-note.md
+++ b/release-note.md
@@ -104,6 +104,7 @@
 ### Formats
 
 - [Json] Remove assert key word. (#5919)
+- [Formats] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5948)
 
 ### Connector-V2
 
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index da2f8e039c..45072c32dc 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -25,10 +25,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.format.json.JsonToRowConverters;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.connect.data.Schema;
@@ -58,6 +57,7 @@ public class CompatibleKafkaConnectDeserializationSchema
     private static final String INCLUDE_SCHEMA_METHOD = 
"convertToJsonWithEnvelope";
     private static final String EXCLUDE_SCHEMA_METHOD = 
"convertToJsonWithoutEnvelope";
     private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
+    public static final String FORMAT = "Kafka.Connect";
     private transient JsonConverter keyConverter;
     private transient JsonConverter valueConverter;
     private transient Method keyConverterMethod;
@@ -126,15 +126,13 @@ public class CompatibleKafkaConnectDeserializationSchema
         if (jsonNode.isNull()) {
             return null;
         }
+
         try {
             org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode 
jsonData =
                     objectMapper.readTree(jsonNode.toString());
             return (SeaTunnelRow) runtimeConverter.convert(jsonData);
         } catch (Throwable t) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", 
jsonNode),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), 
t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index fe74643f37..90387c08ef 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
@@ -42,6 +43,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 public class JsonDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
     private static final long serialVersionUID = 1L;
 
+    private static final String FORMAT = "Common";
+
     /** Flag indicating whether to fail if a field is missing. */
     private final boolean failOnMissingField;
 
@@ -133,10 +136,7 @@ public class JsonDeserializationSchema implements 
DeserializationSchema<SeaTunne
             if (ignoreParseErrors) {
                 return null;
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", 
jsonNode),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), 
t);
         }
     }
 
@@ -155,10 +155,7 @@ public class JsonDeserializationSchema implements 
DeserializationSchema<SeaTunne
             if (ignoreParseErrors) {
                 return NullNode.getInstance();
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
         }
     }
 
@@ -169,10 +166,7 @@ public class JsonDeserializationSchema implements 
DeserializationSchema<SeaTunne
             if (ignoreParseErrors) {
                 return NullNode.getInstance();
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", message),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 44a2b6ba86..f743dc3dbe 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -24,8 +24,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.common.exception.CommonError;
 
 import lombok.Getter;
 
@@ -33,6 +32,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 public class JsonSerializationSchema implements SerializationSchema {
 
+    public static final String FORMAT = "Common";
     /** RowType to generate the runtime converter. */
     private final SeaTunnelRowType rowType;
 
@@ -58,11 +58,8 @@ public class JsonSerializationSchema implements 
SerializationSchema {
         try {
             runtimeConverter.convert(mapper, node, row);
             return mapper.writeValueAsBytes(node);
-        } catch (Throwable e) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", row),
-                    e);
+        } catch (Throwable t) {
+            throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
         }
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 9437442481..aee3c1a896 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
@@ -65,6 +66,8 @@ public class JsonToRowConverters implements Serializable {
                     .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
                     .toFormatter();
 
+    public static final String FORMAT = "Common";
+
     /** Flag indicating whether to fail if a field is missing. */
     private final boolean failOnMissingField;
 
@@ -270,10 +273,7 @@ public class JsonToRowConverters implements Serializable {
         try {
             return jsonNode.binaryValue();
         } catch (IOException e) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    "Unable to deserialize byte array.",
-                    e);
+            throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), 
e);
         }
     }
 
@@ -324,9 +324,9 @@ public class JsonToRowConverters implements Serializable {
                         Object convertedField = 
convertField(fieldConverters[i], fieldName, field);
                         row.setField(i, convertedField);
                     } catch (Throwable t) {
-                        throw new SeaTunnelJsonFormatException(
-                                
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                                String.format("Fail to deserialize at field: 
%s.", fieldName),
+                        throw CommonError.jsonOperationError(
+                                FORMAT,
+                                String.format("Field $.%s in %s", fieldName, 
jsonNode.toString()),
                                 t);
                     }
                 }
@@ -375,8 +375,7 @@ public class JsonToRowConverters implements Serializable {
             JsonToRowConverter fieldConverter, String fieldName, JsonNode 
field) {
         if (field == null) {
             if (failOnMissingField) {
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
+                throw new IllegalArgumentException(
                         String.format("Could not find field with name %s .", 
fieldName));
             } else {
                 return null;
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
index 765f6e7599..c6cb1cfdd8 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -33,6 +34,8 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema {
 
     private static final long serialVersionUID = 1L;
 
+    private static final String FORMAT = "Canal";
+
     private static final String OP_INSERT = "INSERT";
     private static final String OP_DELETE = "DELETE";
 
@@ -53,10 +56,7 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema {
             reuse.setField(1, opType);
             return jsonSerializer.serialize(reuse);
         } catch (Throwable t) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Could not serialize row %s.", row),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
index 1dece25824..2859afc122 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -25,12 +25,13 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import java.io.IOException;
 
+import static java.lang.String.format;
+
 public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
     private static final long serialVersionUID = 1L;
 
@@ -40,10 +41,12 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
     private static final String OP_DELETE = "d"; // delete
 
     private static final String REPLICA_IDENTITY_EXCEPTION =
-            "The \"before\" field of %s message is null, "
+            "The \"before\" field of %s operation is null, "
                     + "if you are using Debezium Postgres Connector, "
                     + "please check the Postgres table has been set REPLICA 
IDENTITY to FULL level.";
 
+    public static final String FORMAT = "Debezium";
+
     private final SeaTunnelRowType rowType;
 
     private final JsonDeserializationSchema jsonDeserializer;
@@ -93,8 +96,7 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
             } else if (OP_UPDATE.equals(op)) {
                 SeaTunnelRow before = convertJsonNode(payload.get("before"));
                 if (before == null) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                    throw new IllegalStateException(
                             String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
                 }
                 before.setRowKind(RowKind.UPDATE_BEFORE);
@@ -106,28 +108,18 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
             } else if (OP_DELETE.equals(op)) {
                 SeaTunnelRow delete = convertJsonNode(payload.get("before"));
                 if (delete == null) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                    throw new IllegalStateException(
                             String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
                 }
                 delete.setRowKind(RowKind.DELETE);
                 out.collect(delete);
             } else {
-                if (!ignoreParseErrors) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            String.format(
-                                    "Unknown \"op\" value \"%s\". The Debezium 
JSON message is '%s'",
-                                    op, new String(message)));
-                }
+                throw new IllegalStateException(format("Unknown operation type 
'%s'.", op));
             }
         } catch (Throwable t) {
             // a big try catch to protect the processing.
             if (!ignoreParseErrors) {
-                throw new SeaTunnelJsonFormatException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                        String.format("Corrupt Debezium JSON message '%s'.", 
new String(message)),
-                        t);
+                throw CommonError.jsonOperationError(FORMAT, new 
String(message), t);
             }
         }
     }
@@ -142,14 +134,8 @@ public class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<
     private JsonNode convertBytes(byte[] message) {
         try {
             return jsonDeserializer.deserializeToJsonNode(message);
-        } catch (Exception t) {
-            if (ignoreParseErrors) {
-                return null;
-            }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
-                    t);
+        } catch (IOException t) {
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
index aac9c46fb5..0b1773e403 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -21,9 +21,8 @@ import 
org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static 
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
@@ -33,6 +32,7 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema {
 
     private static final String OP_INSERT = "c"; // insert
     private static final String OP_DELETE = "d"; // delete
+    public static final String FORMAT = "Debezium";
 
     private final JsonSerializationSchema jsonSerializer;
 
@@ -65,10 +65,7 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema {
                                     "Unsupported operation '%s' for row 
kind.", row.getRowKind()));
             }
         } catch (Throwable t) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Could not serialize row %s.", row),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
index 32eb9d2fe0..2499736fa9 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -35,6 +36,7 @@ public class OggJsonSerializationSchema implements 
SerializationSchema {
 
     private static final String OP_INSERT = "INSERT";
     private static final String OP_DELETE = "DELETE";
+    public static final String FORMAT = "Ogg";
 
     private transient SeaTunnelRow reuse;
 
@@ -53,10 +55,7 @@ public class OggJsonSerializationSchema implements 
SerializationSchema {
             reuse.setField(1, opType);
             return jsonSerializer.serialize(reuse);
         } catch (Throwable t) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Could not serialize row %s.", row),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
         }
     }
 
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index d9ff06965b..109f8b496a 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -27,8 +27,10 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Timestamp;
@@ -47,7 +49,7 @@ import static 
org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class JsonRowDataSerDeSchemaTest {
 
@@ -275,7 +277,7 @@ public class JsonRowDataSerDeSchemaTest {
     }
 
     @Test
-    public void testDeserializationMissingField() throws Exception {
+    public void testDeserializationPassMissingField() throws Exception {
         ObjectMapper objectMapper = new ObjectMapper();
 
         // Root
@@ -287,37 +289,106 @@ public class JsonRowDataSerDeSchemaTest {
                 new SeaTunnelRowType(new String[] {"name"}, new 
SeaTunnelDataType[] {STRING_TYPE});
 
         // pass on missing field
-        JsonDeserializationSchema deserializationSchema =
-                new JsonDeserializationSchema(false, false, schema);
+        final JsonDeserializationSchema deser = new 
JsonDeserializationSchema(false, false, schema);
 
         SeaTunnelRow expected = new SeaTunnelRow(1);
-        SeaTunnelRow actual = 
deserializationSchema.deserialize(serializedJson);
+        SeaTunnelRow actual = deser.deserialize(serializedJson);
         assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testDeserializationMissingField() throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        // Root
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("id", 123123123);
+        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(new String[] {"name"}, new 
SeaTunnelDataType[] {STRING_TYPE});
 
         // fail on missing field
-        deserializationSchema = new JsonDeserializationSchema(true, false, 
schema);
+        final JsonDeserializationSchema deser = new 
JsonDeserializationSchema(true, false, schema);
+
+        SeaTunnelRuntimeException expected =
+                CommonError.jsonOperationError("Common", root.toString());
+        SeaTunnelRuntimeException actual =
+                assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () -> {
+                            deser.deserialize(serializedJson);
+                        },
+                        "expecting exception message: " + 
expected.getMessage());
+        assertEquals(actual.getMessage(), expected.getMessage());
+
+        SeaTunnelRuntimeException expectedCause =
+                CommonError.jsonOperationError("Common", "Field $.name in " + 
root.toString());
+        Throwable cause = actual.getCause();
+        assertEquals(cause.getClass(), expectedCause.getClass());
+        assertEquals(cause.getMessage(), expectedCause.getMessage());
+    }
 
-        String errorMessage =
-                "ErrorCode:[COMMON-02], ErrorDescription:[Json covert/parse 
operation failed] - Failed to deserialize JSON '{\"id\":123123123}'.";
-        try {
-            deserializationSchema.deserialize(serializedJson);
-            fail("expecting exception message: " + errorMessage);
-        } catch (Throwable t) {
-            assertEquals(errorMessage, t.getMessage());
-        }
+    @Test
+    public void testDeserializationIgnoreParseError() throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        // Root
+        ObjectNode root = objectMapper.createObjectNode();
+        root.put("id", 123123123);
+        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(new String[] {"name"}, new 
SeaTunnelDataType[] {STRING_TYPE});
+        SeaTunnelRow expected = new SeaTunnelRow(1);
 
         // ignore on parse error
-        deserializationSchema = new JsonDeserializationSchema(false, true, 
schema);
-        assertEquals(expected, 
deserializationSchema.deserialize(serializedJson));
+        final JsonDeserializationSchema deser = new 
JsonDeserializationSchema(false, true, schema);
+        assertEquals(expected, deser.deserialize(serializedJson));
+    }
 
-        errorMessage =
+    @Test
+    public void testDeserializationFailOnMissingFieldIgnoreParseError() throws 
Exception {
+        String errorMessage =
                 "ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] - 
JSON format doesn't support failOnMissingField and ignoreParseErrors are both 
enabled.";
-        try {
-            // failOnMissingField and ignoreParseErrors both enabled
-            new JsonDeserializationSchema(true, true, schema);
-            Assertions.fail("expecting exception message: " + errorMessage);
-        } catch (Throwable t) {
-            assertEquals(errorMessage, t.getMessage());
-        }
+
+        SeaTunnelJsonFormatException actual =
+                assertThrows(
+                        SeaTunnelJsonFormatException.class,
+                        () -> {
+                            new JsonDeserializationSchema(true, true, null);
+                        },
+                        "expecting exception message: " + errorMessage);
+        assertEquals(actual.getMessage(), errorMessage);
+    }
+
+    @Test
+    public void testDeserializationNoJson() throws Exception {
+        SeaTunnelRowType schema =
+                new SeaTunnelRowType(new String[] {"name"}, new 
SeaTunnelDataType[] {STRING_TYPE});
+
+        String noJson = "{]";
+        final JsonDeserializationSchema deser = new 
JsonDeserializationSchema(false, false, schema);
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError("Common", noJson);
+
+        SeaTunnelRuntimeException actual =
+                assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () -> {
+                            deser.deserialize(noJson);
+                        },
+                        "expecting exception message: " + 
expected.getMessage());
+
+        assertEquals(actual.getMessage(), expected.getMessage());
+
+        actual =
+                assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () -> {
+                            deser.deserialize(noJson.getBytes());
+                        },
+                        "expecting exception message: " + 
expected.getMessage());
+
+        assertEquals(actual.getMessage(), expected.getMessage());
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 20088e525b..e74e58e19e 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -41,8 +43,10 @@ import static 
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class DebeziumJsonSerDeSchemaTest {
+    private static final String FORMAT = "Debezium";
 
     private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
             new SeaTunnelRowType(
@@ -65,6 +69,94 @@ public class DebeziumJsonSerDeSchemaTest {
         testSerializationDeserialization("debezium-data.txt", false);
     }
 
+    @Test
+    public void testDeserializeNoJson() throws Exception {
+        final DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
false);
+        final SimpleCollector collector = new SimpleCollector();
+
+        String noJsonMsg = "{]";
+
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeEmptyJson() throws Exception {
+        final DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
false);
+        final SimpleCollector collector = new SimpleCollector();
+        String emptyMsg = "{}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, emptyMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+    }
+
+    @Test
+    public void testDeserializeNoDataJson() throws Exception {
+        final DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
false);
+        final SimpleCollector collector = new SimpleCollector();
+        String noDataMsg = "{\"op\":\"u\"}";
+        SeaTunnelRuntimeException expected = 
CommonError.jsonOperationError(FORMAT, noDataMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable noDataCause = cause.getCause();
+        assertEquals(noDataCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                noDataCause.getMessage(),
+                String.format(
+                        "The \"before\" field of %s operation is null, "
+                                + "if you are using Debezium Postgres 
Connector, "
+                                + "please check the Postgres table has been 
set REPLICA IDENTITY to FULL level.",
+                        "UPDATE"));
+    }
+
+    @Test
+    public void testDeserializeUnknownOperationTypeJson() throws Exception {
+        final DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, 
false);
+        final SimpleCollector collector = new SimpleCollector();
+        String unknownType = "XX";
+        String unknownOperationMsg =
+                
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
 2-wheel scooter\",\"weight\":3.14},\"op\":\""
+                        + unknownType
+                        + "\"}";
+        SeaTunnelRuntimeException expected =
+                CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+        SeaTunnelRuntimeException cause =
+                assertThrows(
+                        expected.getClass(),
+                        () -> {
+                            deserializationSchema.deserialize(
+                                    unknownOperationMsg.getBytes(), collector);
+                        });
+        assertEquals(cause.getMessage(), expected.getMessage());
+
+        Throwable unknownTypeCause = cause.getCause();
+        assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+        assertEquals(
+                unknownTypeCause.getMessage(),
+                String.format("Unknown operation type '%s'.", unknownType));
+    }
+
     private void testSerializationDeserialization(String resourceFile, boolean 
schemaInclude)
             throws Exception {
         List<String> lines = readLines(resourceFile);


Reply via email to