This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 701fdaf890 [Improve][Formats][Json] Remove assert key word. (#5919)
701fdaf890 is described below
commit 701fdaf89068ae662feb757e195228d70674fec1
Author: Chengyu Yan <[email protected]>
AuthorDate: Sat Dec 2 11:57:49 2023 +0800
[Improve][Formats][Json] Remove assert key word. (#5919)
---
release-note.md | 4 +
.../seatunnel/common/exception/CommonError.java | 19 +++
.../common/exception/CommonErrorCode.java | 2 +
.../exception/SeaTunnelRuntimeException.java | 9 ++
.../json/canal/CanalJsonDeserializationSchema.java | 156 ++++++++++----------
.../json/ogg/OggJsonDeserializationSchema.java | 160 +++++++++++----------
.../json/canal/CanalJsonSerDeSchemaTest.java | 85 +++++++++++
.../format/json/ogg/OggJsonSerDeSchemaTest.java | 91 +++++++++++-
8 files changed, 370 insertions(+), 156 deletions(-)
diff --git a/release-note.md b/release-note.md
index 9282dd97b7..147823065f 100644
--- a/release-note.md
+++ b/release-note.md
@@ -101,6 +101,10 @@
- [Core] [Starter] Add check of sink and source config to avoid null pointer
exception. (#4734)
- [Core] [Flink] Remove useless stage type related codes. (#5650)
+### Formats
+
+- [Json] Remove assert key word. (#5919)
+
### Connector-V2
- [Connector-V2] [CDC] Improve startup.mode/stop.mode options (#4360)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index 74d7c0efd3..a62247f3d9 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -31,6 +31,7 @@ import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_S
import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
/**
@@ -118,4 +119,22 @@ public class CommonError {
return new SeaTunnelRuntimeException(
GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
}
+
+ public static SeaTunnelRuntimeException jsonOperationError(String format,
String payload) {
+ return jsonOperationError(format, payload, null);
+ }
+
+ public static SeaTunnelRuntimeException jsonOperationError(
+ String format, String payload, Throwable cause) {
+ Map<String, String> params = new HashMap<>();
+ params.put("format", format);
+ params.put("payload", payload);
+ SeaTunnelErrorCode code = JSON_OPERATION_FAILED;
+
+ if (cause != null) {
+ return new SeaTunnelRuntimeException(code, params, cause);
+ } else {
+ return new SeaTunnelRuntimeException(code, params);
+ }
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index a0817a4e72..1fe001c07b 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.common.exception;
/** SeaTunnel connector error code interface */
public enum CommonErrorCode implements SeaTunnelErrorCode {
+ JSON_OPERATION_FAILED("COMMON-02", "<format> JSON convert/parse
'<payload>' operation failed."),
+
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of
'<field>'"),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
index 4f6f021522..65a421bcd9 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
@@ -64,6 +64,15 @@ public class SeaTunnelRuntimeException extends
RuntimeException {
this.params = params;
}
+ public SeaTunnelRuntimeException(
+ SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params,
Throwable cause) {
+ super(
+
ExceptionParamsUtil.getDescription(seaTunnelErrorCode.getErrorMessage(),
params),
+ cause);
+ this.seaTunnelErrorCode = seaTunnelErrorCode;
+ this.params = params;
+ }
+
public SeaTunnelErrorCode getSeaTunnelErrorCode() {
return seaTunnelErrorCode;
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index a1b3f74dce..96482291ec 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -28,9 +28,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import java.io.IOException;
import java.util.regex.Pattern;
@@ -38,9 +38,10 @@ import java.util.regex.Pattern;
import static java.lang.String.format;
public class CanalJsonDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
-
private static final long serialVersionUID = 1L;
+ private static final String FORMAT = "Canal";
+
private static final String FIELD_OLD = "old";
private static final String FIELD_DATA = "data";
@@ -105,7 +106,8 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(
+ "Please invoke DeserializationSchema#deserialize(byte[],
Collector<SeaTunnelRow>) instead.");
}
@Override
@@ -113,93 +115,93 @@ public class CanalJsonDeserializationSchema implements
DeserializationSchema<Sea
return this.physicalRowType;
}
- @Override
- public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
- if (message == null) {
- return;
- }
- ObjectNode jsonNode = (ObjectNode) convertBytes(message);
- assert jsonNode != null;
- deserialize(jsonNode, out);
- }
-
- public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
- if (database != null
- &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
- return;
- }
- if (table != null &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
- return;
- }
- JsonNode dataNode = jsonNode.get(FIELD_DATA);
- String type = jsonNode.get(FIELD_TYPE).asText();
- // When a null value is encountered, an exception needs to be thrown
for easy sensing
- if (dataNode == null || dataNode.isNull()) {
- // We'll skip the query or create or alter event data
- if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out)
throws IOException {
+ try {
+ if (database != null
+ &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
return;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- format("Null data value \"%s\" Cannot send downstream",
jsonNode));
- }
- if (OP_INSERT.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- out.collect(row);
+ if (table != null
+ &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+ return;
}
- } else if (OP_UPDATE.equals(type)) {
- final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow after = convertJsonNode(dataNode.get(i));
- SeaTunnelRow before = convertJsonNode(oldNode.get(i));
- for (int f = 0; f < fieldCount; f++) {
- assert before != null;
- if (before.isNullAt(f) && oldNode.findValue(fieldNames[f])
== null) {
- // fields in "old" (before) means the fields are
changed
- // fields not in "old" (before) means the fields are
not changed
- // so we just copy the not changed fields into before
- assert after != null;
- before.setField(f, after.getField(f));
- }
+
+ JsonNode dataNode = jsonNode.get(FIELD_DATA);
+ String type = jsonNode.get(FIELD_TYPE).asText();
+ // When a null value is encountered, an exception needs to be
thrown for easy sensing
+ if (dataNode == null || dataNode.isNull()) {
+ // We'll skip the query or create or alter event data
+ if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ return;
}
- assert before != null;
- before.setRowKind(RowKind.UPDATE_BEFORE);
- assert after != null;
- after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(before);
- out.collect(after);
+ throw new IllegalStateException(
+ format("Null data value '%s' Cannot send downstream",
jsonNode));
}
- } else if (OP_DELETE.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- assert row != null;
- row.setRowKind(RowKind.DELETE);
- out.collect(row);
+ if (OP_INSERT.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ out.collect(row);
+ }
+ } else if (OP_UPDATE.equals(type)) {
+ final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow after = convertJsonNode(dataNode.get(i));
+ SeaTunnelRow before = convertJsonNode(oldNode.get(i));
+ for (int f = 0; f < fieldCount; f++) {
+ if (before.isNullAt(f) &&
oldNode.findValue(fieldNames[f]) == null) {
+ // fields in "old" (before) means the fields are
changed
+ // fields not in "old" (before) means the fields
are not changed
+ // so we just copy the not changed fields into
before
+ before.setField(f, after.getField(f));
+ }
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(before);
+ out.collect(after);
+ }
+ } else if (OP_DELETE.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ row.setRowKind(RowKind.DELETE);
+ out.collect(row);
+ }
+ } else {
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", type));
}
- } else {
+ } catch (Throwable t) {
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
- type, jsonNode.asText()));
+ throw CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
}
}
}
- private JsonNode convertBytes(byte[] message) {
+ private ObjectNode convertBytes(byte[] message) throws
SeaTunnelRuntimeException {
try {
- return jsonDeserializer.deserializeToJsonNode(message);
- } catch (Exception t) {
- if (ignoreParseErrors) {
- return null;
+ return (ObjectNode)
jsonDeserializer.deserializeToJsonNode(message);
+ } catch (Throwable t) {
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
+ }
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ if (message == null) {
+ return;
+ }
+
+ ObjectNode jsonNode;
+ try {
+ jsonNode = convertBytes(message);
+ } catch (SeaTunnelRuntimeException cause) {
+ if (!ignoreParseErrors) {
+ throw cause;
+ } else {
+ return;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)),
- t);
}
+
+ deserialize(jsonNode, out);
}
private SeaTunnelRow convertJsonNode(JsonNode root) {
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
index dc78566bbe..36e6e541c4 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java
@@ -27,9 +27,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import java.io.IOException;
import java.util.regex.Pattern;
@@ -40,6 +40,8 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
private static final long serialVersionUID = 1L;
+ private static final String FORMAT = "Ogg";
+
private static final String FIELD_TYPE = "op_type";
private static final String FIELD_DATABASE_TABLE = "table";
@@ -55,7 +57,7 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
private static final String OP_DELETE = "D"; // DELETE
private static final String REPLICA_IDENTITY_EXCEPTION =
- "The \"before\" field of %s message is null, "
+ "The \"before\" field of %s operation message is null, "
+ "if you are using Ogg Postgres Connector, "
+ "please check the Postgres table has been set REPLICA
IDENTITY to FULL level.";
@@ -101,9 +103,8 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)));
+ throw new UnsupportedOperationException(
+ "Please invoke DeserializationSchema#deserialize(byte[],
Collector<SeaTunnelRow>) instead.");
}
@Override
@@ -111,91 +112,94 @@ public class OggJsonDeserializationSchema implements
DeserializationSchema<SeaTu
return this.physicalRowType;
}
- public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
- if (message == null || message.length == 0) {
- // skip tombstone messages
- return;
+ private ObjectNode convertBytes(byte[] message) throws
SeaTunnelRuntimeException {
+ try {
+ return (ObjectNode)
jsonDeserializer.deserializeToJsonNode(message);
+ } catch (Throwable t) {
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
}
- ObjectNode jsonNode = (ObjectNode) convertBytes(message);
- assert jsonNode != null;
+ }
- if (database != null
- && !databasePattern
-
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
- .matches()) {
- return;
- }
- if (table != null
- && !tablePattern
-
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
- .matches()) {
- return;
- }
- String op = jsonNode.get(FIELD_TYPE).asText().trim();
- if (OP_INSERT.equals(op)) {
- // Gets the data for the INSERT operation
- JsonNode dataBefore = jsonNode.get(DATA_AFTER);
- SeaTunnelRow row = convertJsonNode(dataBefore);
- out.collect(row);
- } else if (OP_UPDATE.equals(op)) {
- JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
- // Modify Operation Data cannot be empty before modification
- if (dataBefore == null || dataBefore.isNull()) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out)
throws IOException {
+ try {
+ if (database != null
+ && !databasePattern
+
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
+ .matches()) {
+ return;
}
- JsonNode dataAfter = jsonNode.get(DATA_AFTER);
- // Gets the data for the UPDATE BEFORE operation
- SeaTunnelRow before = convertJsonNode(dataBefore);
- // Gets the data for the UPDATE AFTER operation
- SeaTunnelRow after = convertJsonNode(dataAfter);
- assert before != null;
- before.setRowKind(RowKind.UPDATE_BEFORE);
- out.collect(before);
- assert after != null;
- after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(after);
-
- } else if (OP_DELETE.equals(op)) {
- JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
- if (dataBefore == null || dataBefore.isNull()) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+ if (table != null
+ && !tablePattern
+
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
+ .matches()) {
+ return;
}
- // Gets the data for the DELETE BEFORE operation
- SeaTunnelRow before = convertJsonNode(dataBefore);
- if (before == null) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- format(
- "The data %s the %s cannot be null \"%s\" ",
- "BEFORE", "DELETE", new String(message)));
+
+ String op = jsonNode.get(FIELD_TYPE).asText().trim();
+ if (OP_INSERT.equals(op)) {
+ // Gets the data for the INSERT operation
+ JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+ SeaTunnelRow row = convertJsonNode(dataAfter);
+ out.collect(row);
+ } else if (OP_UPDATE.equals(op)) {
+ JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+ // Modify Operation Data cannot be empty before modification
+ if (dataBefore == null || dataBefore.isNull()) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
+ }
+ JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+ // Gets the data for the UPDATE BEFORE operation
+ SeaTunnelRow before = convertJsonNode(dataBefore);
+ // Gets the data for the UPDATE AFTER operation
+ SeaTunnelRow after = convertJsonNode(dataAfter);
+
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ out.collect(before);
+
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(after);
+ } else if (OP_DELETE.equals(op)) {
+ JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+ if (dataBefore == null || dataBefore.isNull()) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"DELETE"));
+ }
+ // Gets the data for the DELETE BEFORE operation
+ SeaTunnelRow before = convertJsonNode(dataBefore);
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"DELETE"));
+ }
+ before.setRowKind(RowKind.DELETE);
+ out.collect(before);
+ } else {
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", op));
}
- before.setRowKind(RowKind.DELETE);
- out.collect(before);
- } else {
+ } catch (Throwable t) {
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
- op, new String(message)));
+ throw CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
}
}
}
- private JsonNode convertBytes(byte[] message) {
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ if (message == null || message.length == 0) {
+ // skip tombstone messages
+ return;
+ }
+
+ ObjectNode jsonNode;
try {
- return jsonDeserializer.deserializeToJsonNode(message);
- } catch (Exception t) {
- if (ignoreParseErrors) {
- return null;
+ jsonNode = convertBytes(message);
+ } catch (Throwable cause) {
+ if (!ignoreParseErrors) {
+ throw cause;
+ } else {
+ return;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)),
- t);
}
+ deserialize(jsonNode, out);
}
private SeaTunnelRow convertJsonNode(JsonNode root) {
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index c7544f6db8..601e9c738c 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.junit.jupiter.api.Test;
@@ -40,8 +42,10 @@ import static
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class CanalJsonSerDeSchemaTest {
+ private static final String FORMAT = "Canal";
private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
new SeaTunnelRowType(
@@ -69,6 +73,87 @@ public class CanalJsonSerDeSchemaTest {
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"type\":\"INSERT\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format("Null data value '%s' Cannot send downstream",
noDataMsg));
+ }
+
+ @Test
+ public void testDeserializeUnknownTypeJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String unknownType = "XX";
+ String unknownOperationMsg =
+ "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"type\":\"" +
unknownType + "\"}";
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+ deserializationSchema.deserialize(
+ unknownOperationMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable unknownTypeCause = cause.getCause();
+ assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ unknownTypeCause.getMessage(),
+ String.format("Unknown operation type '%s'.", unknownType));
+ }
+
public void runTest(List<String> lines, CanalJsonDeserializationSchema
deserializationSchema)
throws IOException {
SimpleCollector collector = new SimpleCollector();
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
index a08927737f..1f44c7cfbc 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -41,8 +43,10 @@ import static
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class OggJsonSerDeSchemaTest {
+ private static final String FORMAT = "Ogg";
private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
new SeaTunnelRowType(
@@ -66,10 +70,95 @@ public class OggJsonSerDeSchemaTest {
createOggJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"op_type\":\"U\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format(
+ "The \"before\" field of %s operation message is null,
"
+ + "if you are using Ogg Postgres Connector, "
+ + "please check the Postgres table has been
set REPLICA IDENTITY to FULL level.",
+ "UPDATE"));
+ }
+
+ @Test
+ public void testDeserializeUnknownTypeJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String unknownType = "XX";
+ String unknownOperationMsg =
+ "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"op_type\":\"" +
unknownType + "\"}";
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+ deserializationSchema.deserialize(
+ unknownOperationMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable unknownTypeCause = cause.getCause();
+ assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ unknownTypeCause.getMessage(),
+ String.format("Unknown operation type '%s'.", unknownType));
+ }
+
public void runTest(List<String> lines, OggJsonDeserializationSchema
deserializationSchema)
throws IOException {
SimpleCollector collector = new SimpleCollector();