This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 421baf5bb [Hotfix][Format][Canal-Json] Fix json deserialize NPE (#4195)
421baf5bb is described below
commit 421baf5bb0292e26970e3a27b24744ec631f1164
Author: zhilinli <[email protected]>
AuthorDate: Fri Mar 17 18:03:51 2023 +0800
[Hotfix][Format][Canal-Json] Fix json deserialize NPE (#4195)
* [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null
pointer prevention
* [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null
pointer prevention
* [Bug][seatunnel-format-json][canal-json] Fix JSON deserialize data Null
pointer prevention
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
* [Hotfix][Format][Canal-Json] Fix json deserialize NPE #4195
---
.../json/canal/CanalJsonDeserializationSchema.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 2c478f619..948c0e91e 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -59,6 +59,10 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
private static final String OP_CREATE = "CREATE";
+ private static final String OP_QUERY = "QUERY";
+
+ private static final String OP_ALTER = "ALTER";
+
private String database;
private String table;
@@ -109,24 +113,32 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
return this.physicalRowType;
}
+ @Override
public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
if (message == null) {
return;
}
ObjectNode jsonNode = (ObjectNode) convertBytes(message);
assert jsonNode != null;
- if (database != null) {
- if
(!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
- return;
- }
+ if (database != null
+ &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
+ return;
}
- if (table != null) {
- if
(!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
- return;
- }
+ if (table != null &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+ return;
}
JsonNode dataNode = jsonNode.get(FIELD_DATA);
String type = jsonNode.get(FIELD_TYPE).asText();
+ // When a null value is encountered, an exception needs to be thrown
for easy sensing
+ if (dataNode == null || dataNode.isNull()) {
+ // We'll skip the query or create or alter event data
+ if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ return;
+ }
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.JSON_OPERATION_FAILED,
+ format("Null data value \"%s\" Cannot send downstream",
new String(message)));
+ }
if (OP_INSERT.equals(type)) {
for (int i = 0; i < dataNode.size(); i++) {
SeaTunnelRow row = convertJsonNode(dataNode.get(i));
@@ -161,9 +173,6 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
row.setRowKind(RowKind.DELETE);
out.collect(row);
}
- } else if (OP_CREATE.equals(type)) {
- // "data" field is null and "type" is "CREATE" which means
- // this is a DDL change event, and we should skip it.
} else {
if (!ignoreParseErrors) {
throw new SeaTunnelJsonFormatException(
@@ -178,7 +187,7 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
private JsonNode convertBytes(byte[] message) {
try {
return jsonDeserializer.deserializeToJsonNode(message);
- } catch (Throwable t) {
+ } catch (Exception t) {
if (ignoreParseErrors) {
return null;
}