This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 421baf5bb [Hotfix][Format][Canal-Json] Fix json deserialize NPE (#4195)
421baf5bb is described below

commit 421baf5bb0292e26970e3a27b24744ec631f1164
Author: zhilinli <[email protected]>
AuthorDate: Fri Mar 17 18:03:51 2023 +0800

    [Hotfix][Format][Canal-Json] Fix json deserialize NPE (#4195)
    
    * [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null 
pointer prevention
    
    * [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null 
pointer prevention
    
    * [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null 
pointer prevention
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
    
    * [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
---
 .../json/canal/CanalJsonDeserializationSchema.java | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 2c478f619..948c0e91e 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -59,6 +59,10 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
 
     private static final String OP_CREATE = "CREATE";
 
+    private static final String OP_QUERY = "QUERY";
+
+    private static final String OP_ALTER = "ALTER";
+
     private String database;
 
     private String table;
@@ -109,24 +113,32 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
         return this.physicalRowType;
     }
 
+    @Override
     public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
         if (message == null) {
             return;
         }
         ObjectNode jsonNode = (ObjectNode) convertBytes(message);
         assert jsonNode != null;
-        if (database != null) {
-            if 
(!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
-                return;
-            }
+        if (database != null
+                && 
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
+            return;
         }
-        if (table != null) {
-            if 
(!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
-                return;
-            }
+        if (table != null && 
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+            return;
         }
         JsonNode dataNode = jsonNode.get(FIELD_DATA);
         String type = jsonNode.get(FIELD_TYPE).asText();
+        // When a null value is encountered, an exception needs to be thrown 
for easy sensing
+        if (dataNode == null || dataNode.isNull()) {
+            // We'll skip the query or create or alter event data
+            if (OP_QUERY.equals(type) || OP_CREATE.equals(type) || 
OP_ALTER.equals(type)) {
+                return;
+            }
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.JSON_OPERATION_FAILED,
+                    format("Null data value \"%s\" Cannot send downstream", 
new String(message)));
+        }
         if (OP_INSERT.equals(type)) {
             for (int i = 0; i < dataNode.size(); i++) {
                 SeaTunnelRow row = convertJsonNode(dataNode.get(i));
@@ -161,9 +173,6 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
                 row.setRowKind(RowKind.DELETE);
                 out.collect(row);
             }
-        } else if (OP_CREATE.equals(type)) {
-            // "data" field is null and "type" is "CREATE" which means
-            // this is a DDL change event, and we should skip it.
         } else {
             if (!ignoreParseErrors) {
                 throw new SeaTunnelJsonFormatException(
@@ -178,7 +187,7 @@ public class CanalJsonDeserializationSchema implements 
DeserializationSchema<Sea
     private JsonNode convertBytes(byte[] message) {
         try {
             return jsonDeserializer.deserializeToJsonNode(message);
-        } catch (Throwable t) {
+        } catch (Exception t) {
             if (ignoreParseErrors) {
                 return null;
             }

Reply via email to