Hisoka-X commented on code in PR #5919:
URL: https://github.com/apache/seatunnel/pull/5919#discussion_r1410175510
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -69,6 +74,105 @@ public void testDeserializeNullRow() throws Exception {
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
Review Comment:
You can use `Assertions.assertThrows` to catch exception. Please refer
https://github.com/apache/seatunnel/blob/af2acb16fe315f670c5909f7489042ed1c9b191d/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java#L155-L169
##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -101,101 +103,103 @@ public OggJsonDeserializationSchema(
@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)));
+ throw new UnsupportedOperationException();
}
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.physicalRowType;
}
- public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
- if (message == null || message.length == 0) {
- // skip tombstone messages
- return;
+ private ObjectNode convertBytes(byte[] message) throws
SeaTunnelRuntimeException {
+ try {
+ return (ObjectNode)
jsonDeserializer.deserializeToJsonNode(message);
+ } catch (Throwable t) {
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
}
- ObjectNode jsonNode = (ObjectNode) convertBytes(message);
- assert jsonNode != null;
+ }
- if (database != null
- && !databasePattern
-
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
- .matches()) {
- return;
- }
- if (table != null
- && !tablePattern
-
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
- .matches()) {
- return;
- }
- String op = jsonNode.get(FIELD_TYPE).asText().trim();
- if (OP_INSERT.equals(op)) {
- // Gets the data for the INSERT operation
- JsonNode dataBefore = jsonNode.get(DATA_AFTER);
- SeaTunnelRow row = convertJsonNode(dataBefore);
- out.collect(row);
- } else if (OP_UPDATE.equals(op)) {
- JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
- // Modify Operation Data cannot be empty before modification
- if (dataBefore == null || dataBefore.isNull()) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out)
throws IOException {
+ try {
+ if (database != null
+ && !databasePattern
+
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[0])
+ .matches()) {
+ return;
}
- JsonNode dataAfter = jsonNode.get(DATA_AFTER);
- // Gets the data for the UPDATE BEFORE operation
- SeaTunnelRow before = convertJsonNode(dataBefore);
- // Gets the data for the UPDATE AFTER operation
- SeaTunnelRow after = convertJsonNode(dataAfter);
- assert before != null;
- before.setRowKind(RowKind.UPDATE_BEFORE);
- out.collect(before);
- assert after != null;
- after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(after);
-
- } else if (OP_DELETE.equals(op)) {
- JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
- if (dataBefore == null || dataBefore.isNull()) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
+ if (table != null
+ && !tablePattern
+
.matcher(jsonNode.get(FIELD_DATABASE_TABLE).asText().split("\\.")[1])
+ .matches()) {
+ return;
}
- // Gets the data for the DELETE BEFORE operation
- SeaTunnelRow before = convertJsonNode(dataBefore);
- if (before == null) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- format(
- "The data %s the %s cannot be null \"%s\" ",
- "BEFORE", "DELETE", new String(message)));
+
+ String op = jsonNode.get(FIELD_TYPE).asText().trim();
+ if (OP_INSERT.equals(op)) {
+ // Gets the data for the INSERT operation
+ JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+ SeaTunnelRow row = convertJsonNode(dataAfter);
+ out.collect(row);
+ } else if (OP_UPDATE.equals(op)) {
+ JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+ // Modify Operation Data cannot be empty before modification
+ if (dataBefore == null || dataBefore.isNull()) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
+ }
+ JsonNode dataAfter = jsonNode.get(DATA_AFTER);
+ // Gets the data for the UPDATE BEFORE operation
+ SeaTunnelRow before = convertJsonNode(dataBefore);
+ // Gets the data for the UPDATE AFTER operation
+ SeaTunnelRow after = convertJsonNode(dataAfter);
+
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ out.collect(before);
+
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(after);
+ } else if (OP_DELETE.equals(op)) {
+ JsonNode dataBefore = jsonNode.get(DATA_BEFORE);
+ if (dataBefore == null || dataBefore.isNull()) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"DELETE"));
+ }
+ // Gets the data for the DELETE BEFORE operation
+ SeaTunnelRow before = convertJsonNode(dataBefore);
+ if (before == null) {
+ throw new IllegalStateException(
+ String.format(REPLICA_IDENTITY_EXCEPTION,
"DELETE"));
+ }
+ before.setRowKind(RowKind.DELETE);
+ out.collect(before);
+ } else {
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", op));
}
- before.setRowKind(RowKind.DELETE);
- out.collect(before);
- } else {
+ } catch (Throwable t) {
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
- op, new String(message)));
+ Throwable cause = CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
+ throw new IOException(cause);
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -113,93 +114,93 @@ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.physicalRowType;
}
- @Override
- public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
- if (message == null) {
- return;
- }
- ObjectNode jsonNode = (ObjectNode) convertBytes(message);
- assert jsonNode != null;
- deserialize(jsonNode, out);
- }
-
- public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
- if (database != null
- &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
- return;
- }
- if (table != null &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
- return;
- }
- JsonNode dataNode = jsonNode.get(FIELD_DATA);
- String type = jsonNode.get(FIELD_TYPE).asText();
- // When a null value is encountered, an exception needs to be thrown
for easy sensing
- if (dataNode == null || dataNode.isNull()) {
- // We'll skip the query or create or alter event data
- if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out)
throws IOException {
+ try {
+ if (database != null
+ &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
return;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- format("Null data value \"%s\" Cannot send downstream",
jsonNode));
- }
- if (OP_INSERT.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- out.collect(row);
+ if (table != null
+ &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+ return;
}
- } else if (OP_UPDATE.equals(type)) {
- final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow after = convertJsonNode(dataNode.get(i));
- SeaTunnelRow before = convertJsonNode(oldNode.get(i));
- for (int f = 0; f < fieldCount; f++) {
- assert before != null;
- if (before.isNullAt(f) && oldNode.findValue(fieldNames[f])
== null) {
- // fields in "old" (before) means the fields are
changed
- // fields not in "old" (before) means the fields are
not changed
- // so we just copy the not changed fields into before
- assert after != null;
- before.setField(f, after.getField(f));
- }
+
+ JsonNode dataNode = jsonNode.get(FIELD_DATA);
+ String type = jsonNode.get(FIELD_TYPE).asText();
+ // When a null value is encountered, an exception needs to be
thrown for easy sensing
+ if (dataNode == null || dataNode.isNull()) {
+ // We'll skip the query or create or alter event data
+ if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ return;
}
- assert before != null;
- before.setRowKind(RowKind.UPDATE_BEFORE);
- assert after != null;
- after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(before);
- out.collect(after);
+ throw new IllegalStateException(
+ format("Null data value '%s' Cannot send downstream",
jsonNode));
}
- } else if (OP_DELETE.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- assert row != null;
- row.setRowKind(RowKind.DELETE);
- out.collect(row);
+ if (OP_INSERT.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ out.collect(row);
+ }
+ } else if (OP_UPDATE.equals(type)) {
+ final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow after = convertJsonNode(dataNode.get(i));
+ SeaTunnelRow before = convertJsonNode(oldNode.get(i));
+ for (int f = 0; f < fieldCount; f++) {
+ if (before.isNullAt(f) &&
oldNode.findValue(fieldNames[f]) == null) {
+ // fields in "old" (before) means the fields are
changed
+ // fields not in "old" (before) means the fields
are not changed
+ // so we just copy the not changed fields into
before
+ before.setField(f, after.getField(f));
+ }
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(before);
+ out.collect(after);
+ }
+ } else if (OP_DELETE.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ row.setRowKind(RowKind.DELETE);
+ out.collect(row);
+ }
+ } else {
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", type));
}
- } else {
+ } catch (Throwable t) {
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
- type, jsonNode.asText()));
+ Throwable cause = CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
+ throw new IOException(cause);
Review Comment:
```suggestion
throw CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
```
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java:
##########
@@ -66,10 +71,113 @@ public void testDeserializeNullRow() throws Exception {
createOggJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"op_type\":\"U\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format(
+ "The \"before\" field of %s operation message is
null, "
+ + "if you are using Ogg Postgres
Connector, "
+ + "please check the Postgres table has
been set REPLICA IDENTITY to FULL level.",
+ "UPDATE"));
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeUnknownTypeJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String unknownType = "XX";
+ String unknownOperationMsg =
+ "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"op_type\":\"" +
unknownType + "\"}";
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(unknownOperationMsg.getBytes(),
collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable unknownTypeCause = cause.getCause();
+ assertEquals(unknownTypeCause.getClass(),
IllegalStateException.class);
+ assertEquals(
+ unknownTypeCause.getMessage(),
+ String.format("Unknown operation type '%s'.",
unknownType));
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -113,93 +114,93 @@ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return this.physicalRowType;
}
- @Override
- public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
- if (message == null) {
- return;
- }
- ObjectNode jsonNode = (ObjectNode) convertBytes(message);
- assert jsonNode != null;
- deserialize(jsonNode, out);
- }
-
- public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
- if (database != null
- &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
- return;
- }
- if (table != null &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
- return;
- }
- JsonNode dataNode = jsonNode.get(FIELD_DATA);
- String type = jsonNode.get(FIELD_TYPE).asText();
- // When a null value is encountered, an exception needs to be thrown
for easy sensing
- if (dataNode == null || dataNode.isNull()) {
- // We'll skip the query or create or alter event data
- if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out)
throws IOException {
+ try {
+ if (database != null
+ &&
!databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
return;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- format("Null data value \"%s\" Cannot send downstream",
jsonNode));
- }
- if (OP_INSERT.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- out.collect(row);
+ if (table != null
+ &&
!tablePattern.matcher(jsonNode.get(FIELD_TABLE).asText()).matches()) {
+ return;
}
- } else if (OP_UPDATE.equals(type)) {
- final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow after = convertJsonNode(dataNode.get(i));
- SeaTunnelRow before = convertJsonNode(oldNode.get(i));
- for (int f = 0; f < fieldCount; f++) {
- assert before != null;
- if (before.isNullAt(f) && oldNode.findValue(fieldNames[f])
== null) {
- // fields in "old" (before) means the fields are
changed
- // fields not in "old" (before) means the fields are
not changed
- // so we just copy the not changed fields into before
- assert after != null;
- before.setField(f, after.getField(f));
- }
+
+ JsonNode dataNode = jsonNode.get(FIELD_DATA);
+ String type = jsonNode.get(FIELD_TYPE).asText();
+ // When a null value is encountered, an exception needs to be
thrown for easy sensing
+ if (dataNode == null || dataNode.isNull()) {
+ // We'll skip the query or create or alter event data
+ if (OP_QUERY.equals(type) || OP_CREATE.equals(type) ||
OP_ALTER.equals(type)) {
+ return;
}
- assert before != null;
- before.setRowKind(RowKind.UPDATE_BEFORE);
- assert after != null;
- after.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(before);
- out.collect(after);
+ throw new IllegalStateException(
+ format("Null data value '%s' Cannot send downstream",
jsonNode));
}
- } else if (OP_DELETE.equals(type)) {
- for (int i = 0; i < dataNode.size(); i++) {
- SeaTunnelRow row = convertJsonNode(dataNode.get(i));
- assert row != null;
- row.setRowKind(RowKind.DELETE);
- out.collect(row);
+ if (OP_INSERT.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ out.collect(row);
+ }
+ } else if (OP_UPDATE.equals(type)) {
+ final ArrayNode oldNode = (ArrayNode) jsonNode.get(FIELD_OLD);
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow after = convertJsonNode(dataNode.get(i));
+ SeaTunnelRow before = convertJsonNode(oldNode.get(i));
+ for (int f = 0; f < fieldCount; f++) {
+ if (before.isNullAt(f) &&
oldNode.findValue(fieldNames[f]) == null) {
+ // fields in "old" (before) means the fields are
changed
+ // fields not in "old" (before) means the fields
are not changed
+ // so we just copy the not changed fields into
before
+ before.setField(f, after.getField(f));
+ }
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(before);
+ out.collect(after);
+ }
+ } else if (OP_DELETE.equals(type)) {
+ for (int i = 0; i < dataNode.size(); i++) {
+ SeaTunnelRow row = convertJsonNode(dataNode.get(i));
+ row.setRowKind(RowKind.DELETE);
+ out.collect(row);
+ }
+ } else {
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", type));
}
- } else {
+ } catch (Throwable t) {
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON
message is '%s'",
- type, jsonNode.asText()));
+ Throwable cause = CommonError.jsonOperationError(FORMAT,
jsonNode.toString(), t);
+ throw new IOException(cause);
Review Comment:
Just throw `SeaTunnelRuntimeException`
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -69,6 +74,105 @@ public void testDeserializeNullRow() throws Exception {
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java:
##########
@@ -66,10 +71,113 @@ public void testDeserializeNullRow() throws Exception {
createOggJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -69,6 +74,105 @@ public void testDeserializeNullRow() throws Exception {
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"type\":\"INSERT\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format("Null data value '%s' Cannot send
downstream", noDataMsg));
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java:
##########
@@ -66,10 +71,113 @@ public void testDeserializeNullRow() throws Exception {
createOggJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -69,6 +74,105 @@ public void testDeserializeNullRow() throws Exception {
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"type\":\"INSERT\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format("Null data value '%s' Cannot send
downstream", noDataMsg));
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeUnknownTypeJson() throws Exception {
+ final CanalJsonDeserializationSchema deserializationSchema =
+ createCanalJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String unknownType = "XX";
+ String unknownOperationMsg =
+ "{\"data\":{\"id\":101,\"name\":\"scooter\"},\"type\":\"" +
unknownType + "\"}";
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(unknownOperationMsg.getBytes(),
collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable unknownTypeCause = cause.getCause();
+ assertEquals(unknownTypeCause.getClass(),
IllegalStateException.class);
+ assertEquals(
+ unknownTypeCause.getMessage(),
+ String.format("Unknown operation type '%s'.",
unknownType));
+ }
Review Comment:
ditto
##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java:
##########
@@ -66,10 +71,113 @@ public void testDeserializeNullRow() throws Exception {
createOggJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noJsonMsg = "{]";
+ boolean isExcept = false;
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ try {
+ deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+ assertTrue(isExcept);
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final OggJsonDeserializationSchema deserializationSchema =
+ createOggJsonDeserializationSchema(null, null);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"op_type\":\"U\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ boolean isExcept = false;
+
+ try {
+ deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ } catch (Throwable t) {
+ isExcept = true;
+ assertEquals(t.getClass(), IOException.class);
+ Throwable cause = t.getCause();
+ assertSame(cause.getClass(), expected.getClass());
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format(
+ "The \"before\" field of %s operation message is
null, "
+ + "if you are using Ogg Postgres
Connector, "
+ + "please check the Postgres table has
been set REPLICA IDENTITY to FULL level.",
+ "UPDATE"));
+ }
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]