Hisoka-X commented on code in PR #5948:
URL: https://github.com/apache/seatunnel/pull/5948#discussion_r1418660962


##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java:
##########
@@ -148,31 +155,25 @@ public SeaTunnelRow convertToRowData(JsonNode message) {
         return (SeaTunnelRow) runtimeConverter.convert(message);
     }
 
-    private JsonNode convertBytes(byte[] message) {
+    private JsonNode convertMayIgnoreError(byte[] message) {

Review Comment:
   the method name should revert too



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java:
##########
@@ -270,10 +273,7 @@ private byte[] convertToBytes(JsonNode jsonNode) {
         try {
             return jsonNode.binaryValue();
         } catch (IOException e) {
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    "Unable to deserialize byte array.",
-                    e);
+            throw CommonError.jsonOperationError("", jsonNode.toString(), e);

Review Comment:
   ```suggestion
               throw CommonError.jsonOperationError(FORMAT, 
jsonNode.toString(), e);
   ```



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -193,11 +194,10 @@ public void deserialize(byte[] message, 
Collector<SeaTunnelRow> out) throws IOEx
         try {
             jsonNode = convertBytes(message);
         } catch (Throwable cause) {
-            if (!ignoreParseErrors) {
-                throw cause;
-            } else {
+            if (ignoreParseErrors) {
                 return;
             }
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
cause);

Review Comment:
   ditto



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -106,28 +108,18 @@ public void deserialize(byte[] message, 
Collector<SeaTunnelRow> out) throws IOEx
             } else if (OP_DELETE.equals(op)) {
                 SeaTunnelRow delete = convertJsonNode(payload.get("before"));
                 if (delete == null) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                    throw new IllegalStateException(
                             String.format(REPLICA_IDENTITY_EXCEPTION, 
"UPDATE"));
                 }
                 delete.setRowKind(RowKind.DELETE);
                 out.collect(delete);
             } else {
-                if (!ignoreParseErrors) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            String.format(
-                                    "Unknown \"op\" value \"%s\". The Debezium 
JSON message is '%s'",
-                                    op, new String(message)));
-                }
+                throw new IllegalStateException(format("Unknown operation type 
'%s'.", op));

Review Comment:
   ditto



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -93,8 +96,7 @@ public void deserialize(byte[] message, 
Collector<SeaTunnelRow> out) throws IOEx
             } else if (OP_UPDATE.equals(op)) {
                 SeaTunnelRow before = convertJsonNode(payload.get("before"));
                 if (before == null) {
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                    throw new IllegalStateException(

Review Comment:
   why changed this?



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -170,17 +169,18 @@ public void deserialize(ObjectNode jsonNode, 
Collector<SeaTunnelRow> out) throws
                 throw new IllegalStateException(format("Unknown operation type 
'%s'.", type));
             }
         } catch (Throwable t) {
-            if (!ignoreParseErrors) {
-                throw CommonError.jsonOperationError(FORMAT, 
jsonNode.toString(), t);
+            if (ignoreParseErrors) {
+                return;
             }
+            throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), 
t);

Review Comment:
   how about revert this changed? Seem the logic did not changed.



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java:
##########
@@ -148,31 +155,25 @@ public SeaTunnelRow convertToRowData(JsonNode message) {
         return (SeaTunnelRow) runtimeConverter.convert(message);
     }
 
-    private JsonNode convertBytes(byte[] message) {
+    private JsonNode convertMayIgnoreError(byte[] message) {
         try {
             return objectMapper.readTree(message);
         } catch (Throwable t) {
             if (ignoreParseErrors) {
                 return NullNode.getInstance();
             }
-            throw new SeaTunnelJsonFormatException(
-                    CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
-                    String.format("Failed to deserialize JSON '%s'.", new 
String(message)),
-                    t);
+            throw CommonError.jsonOperationError(FORMAT, new String(message), 
t);
         }
     }
 
-    private JsonNode convert(String message) {
+    private JsonNode convertMayIgnoreError(String message) {

Review Comment:
   ditto



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -190,18 +190,19 @@ public void deserialize(byte[] message, 
Collector<SeaTunnelRow> out) throws IOEx
             return;
         }
 
-        ObjectNode jsonNode;
+        ObjectNode jsonNode = convertMayIgnoreError(message);
+        deserialize(jsonNode, out);
+    }
+
+    private ObjectNode convertMayIgnoreError(byte[] message) {

Review Comment:
   ditto



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -177,9 +177,10 @@ public void deserialize(ObjectNode jsonNode, 
Collector<SeaTunnelRow> out) throws
                 throw new IllegalStateException(format("Unknown operation type 
'%s'.", op));
             }
         } catch (Throwable t) {
-            if (!ignoreParseErrors) {
-                throw CommonError.jsonOperationError(FORMAT, 
jsonNode.toString(), t);
+            if (ignoreParseErrors) {
+                return;
             }
+            throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), 
t);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to