This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ebb64f14bd [Improve][Formats] Replace
`CommonErrorCodeDeprecated.JSON_OPERATION_FAILED` (#5948)
ebb64f14bd is described below
commit ebb64f14bd57ef70c0da15a192c3b6af1d02318d
Author: Chengyu Yan <[email protected]>
AuthorDate: Thu Dec 14 13:53:27 2023 +0800
[Improve][Formats] Replace
`CommonErrorCodeDeprecated.JSON_OPERATION_FAILED` (#5948)
---
release-note.md | 1 +
...ompatibleKafkaConnectDeserializationSchema.java | 10 +-
.../format/json/JsonDeserializationSchema.java | 18 +--
.../format/json/JsonSerializationSchema.java | 11 +-
.../seatunnel/format/json/JsonToRowConverters.java | 17 ++-
.../json/canal/CanalJsonSerializationSchema.java | 8 +-
.../DebeziumJsonDeserializationSchema.java | 38 ++-----
.../debezium/DebeziumJsonSerializationSchema.java | 9 +-
.../json/ogg/OggJsonSerializationSchema.java | 7 +-
.../format/json/JsonRowDataSerDeSchemaTest.java | 121 ++++++++++++++++-----
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 92 ++++++++++++++++
11 files changed, 233 insertions(+), 99 deletions(-)
diff --git a/release-note.md b/release-note.md
index acfeb8fe8e..5c6d5738a8 100644
--- a/release-note.md
+++ b/release-note.md
@@ -104,6 +104,7 @@
### Formats
- [Json] Remove assert key word. (#5919)
+- [Formats] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5948)
### Connector-V2
diff --git
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
index da2f8e039c..45072c32dc 100644
---
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java
@@ -25,10 +25,9 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.data.Schema;
@@ -58,6 +57,7 @@ public class CompatibleKafkaConnectDeserializationSchema
private static final String INCLUDE_SCHEMA_METHOD =
"convertToJsonWithEnvelope";
private static final String EXCLUDE_SCHEMA_METHOD =
"convertToJsonWithoutEnvelope";
private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
+ public static final String FORMAT = "Kafka.Connect";
private transient JsonConverter keyConverter;
private transient JsonConverter valueConverter;
private transient Method keyConverterMethod;
@@ -126,15 +126,13 @@ public class CompatibleKafkaConnectDeserializationSchema
if (jsonNode.isNull()) {
return null;
}
+
try {
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode
jsonData =
objectMapper.readTree(jsonNode.toString());
return (SeaTunnelRow) runtimeConverter.convert(jsonData);
} catch (Throwable t) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.",
jsonNode),
- t);
+ throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(),
t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
index fe74643f37..90387c08ef 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -42,6 +43,8 @@ import static
com.google.common.base.Preconditions.checkNotNull;
public class JsonDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
private static final long serialVersionUID = 1L;
+ private static final String FORMAT = "Common";
+
/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;
@@ -133,10 +136,7 @@ public class JsonDeserializationSchema implements
DeserializationSchema<SeaTunne
if (ignoreParseErrors) {
return null;
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.",
jsonNode),
- t);
+ throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(),
t);
}
}
@@ -155,10 +155,7 @@ public class JsonDeserializationSchema implements
DeserializationSchema<SeaTunne
if (ignoreParseErrors) {
return NullNode.getInstance();
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)),
- t);
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
}
}
@@ -169,10 +166,7 @@ public class JsonDeserializationSchema implements
DeserializationSchema<SeaTunne
if (ignoreParseErrors) {
return NullNode.getInstance();
}
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", message),
- t);
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
index 44a2b6ba86..f743dc3dbe 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java
@@ -24,8 +24,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.common.exception.CommonError;
import lombok.Getter;
@@ -33,6 +32,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
public class JsonSerializationSchema implements SerializationSchema {
+ public static final String FORMAT = "Common";
/** RowType to generate the runtime converter. */
private final SeaTunnelRowType rowType;
@@ -58,11 +58,8 @@ public class JsonSerializationSchema implements
SerializationSchema {
try {
runtimeConverter.convert(mapper, node, row);
return mapper.writeValueAsBytes(node);
- } catch (Throwable e) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", row),
- e);
+ } catch (Throwable t) {
+ throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
}
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 9437442481..aee3c1a896 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -65,6 +66,8 @@ public class JsonToRowConverters implements Serializable {
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.toFormatter();
+ public static final String FORMAT = "Common";
+
/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;
@@ -270,10 +273,7 @@ public class JsonToRowConverters implements Serializable {
try {
return jsonNode.binaryValue();
} catch (IOException e) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- "Unable to deserialize byte array.",
- e);
+ throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(),
e);
}
}
@@ -324,9 +324,9 @@ public class JsonToRowConverters implements Serializable {
Object convertedField =
convertField(fieldConverters[i], fieldName, field);
row.setField(i, convertedField);
} catch (Throwable t) {
- throw new SeaTunnelJsonFormatException(
-
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Fail to deserialize at field:
%s.", fieldName),
+ throw CommonError.jsonOperationError(
+ FORMAT,
+ String.format("Field $.%s in %s", fieldName,
jsonNode.toString()),
t);
}
}
@@ -375,8 +375,7 @@ public class JsonToRowConverters implements Serializable {
JsonToRowConverter fieldConverter, String fieldName, JsonNode
field) {
if (field == null) {
if (failOnMissingField) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
+ throw new IllegalArgumentException(
String.format("Could not find field with name %s .",
fieldName));
} else {
return null;
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
index 765f6e7599..c6cb1cfdd8 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -33,6 +34,8 @@ public class CanalJsonSerializationSchema implements
SerializationSchema {
private static final long serialVersionUID = 1L;
+ private static final String FORMAT = "Canal";
+
private static final String OP_INSERT = "INSERT";
private static final String OP_DELETE = "DELETE";
@@ -53,10 +56,7 @@ public class CanalJsonSerializationSchema implements
SerializationSchema {
reuse.setField(1, opType);
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Could not serialize row %s.", row),
- t);
+ throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
index 1dece25824..2859afc122 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -25,12 +25,13 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import java.io.IOException;
+import static java.lang.String.format;
+
public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
private static final long serialVersionUID = 1L;
@@ -40,10 +41,12 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
private static final String OP_DELETE = "d"; // delete
private static final String REPLICA_IDENTITY_EXCEPTION =
- "The \"before\" field of %s message is null, "
+ "The \"before\" field of %s operation is null, "
+ "if you are using Debezium Postgres Connector, "
+ "please check the Postgres table has been set REPLICA
IDENTITY to FULL level.";
+ public static final String FORMAT = "Debezium";
+
private final SeaTunnelRowType rowType;
private final JsonDeserializationSchema jsonDeserializer;
@@ -93,8 +96,7 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
} else if (OP_UPDATE.equals(op)) {
SeaTunnelRow before = convertJsonNode(payload.get("before"));
if (before == null) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ throw new IllegalStateException(
String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
}
before.setRowKind(RowKind.UPDATE_BEFORE);
@@ -106,28 +108,18 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
} else if (OP_DELETE.equals(op)) {
SeaTunnelRow delete = convertJsonNode(payload.get("before"));
if (delete == null) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ throw new IllegalStateException(
String.format(REPLICA_IDENTITY_EXCEPTION,
"UPDATE"));
}
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
- if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- String.format(
- "Unknown \"op\" value \"%s\". The Debezium
JSON message is '%s'",
- op, new String(message)));
- }
+ throw new IllegalStateException(format("Unknown operation type
'%s'.", op));
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- String.format("Corrupt Debezium JSON message '%s'.",
new String(message)),
- t);
+ throw CommonError.jsonOperationError(FORMAT, new
String(message), t);
}
}
}
@@ -142,14 +134,8 @@ public class DebeziumJsonDeserializationSchema implements
DeserializationSchema<
private JsonNode convertBytes(byte[] message) {
try {
return jsonDeserializer.deserializeToJsonNode(message);
- } catch (Exception t) {
- if (ignoreParseErrors) {
- return null;
- }
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Failed to deserialize JSON '%s'.", new
String(message)),
- t);
+ } catch (IOException t) {
+ throw CommonError.jsonOperationError(FORMAT, new String(message),
t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
index aac9c46fb5..0b1773e403 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerializationSchema.java
@@ -21,9 +21,8 @@ import
org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static
org.apache.seatunnel.format.json.debezium.DebeziumJsonFormatOptions.GENERATE_ROW_SIZE;
@@ -33,6 +32,7 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema {
private static final String OP_INSERT = "c"; // insert
private static final String OP_DELETE = "d"; // delete
+ public static final String FORMAT = "Debezium";
private final JsonSerializationSchema jsonSerializer;
@@ -65,10 +65,7 @@ public class DebeziumJsonSerializationSchema implements
SerializationSchema {
"Unsupported operation '%s' for row
kind.", row.getRowKind()));
}
} catch (Throwable t) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Could not serialize row %s.", row),
- t);
+ throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
index 32eb9d2fe0..2499736fa9 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -35,6 +36,7 @@ public class OggJsonSerializationSchema implements
SerializationSchema {
private static final String OP_INSERT = "INSERT";
private static final String OP_DELETE = "DELETE";
+ public static final String FORMAT = "Ogg";
private transient SeaTunnelRow reuse;
@@ -53,10 +55,7 @@ public class OggJsonSerializationSchema implements
SerializationSchema {
reuse.setField(1, opType);
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
- String.format("Could not serialize row %s.", row),
- t);
+ throw CommonError.jsonOperationError(FORMAT, row.toString(), t);
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index d9ff06965b..109f8b496a 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -27,8 +27,10 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
@@ -47,7 +49,7 @@ import static
org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class JsonRowDataSerDeSchemaTest {
@@ -275,7 +277,7 @@ public class JsonRowDataSerDeSchemaTest {
}
@Test
- public void testDeserializationMissingField() throws Exception {
+ public void testDeserializationPassMissingField() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
// Root
@@ -287,37 +289,106 @@ public class JsonRowDataSerDeSchemaTest {
new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
// pass on missing field
- JsonDeserializationSchema deserializationSchema =
- new JsonDeserializationSchema(false, false, schema);
+ final JsonDeserializationSchema deser = new
JsonDeserializationSchema(false, false, schema);
SeaTunnelRow expected = new SeaTunnelRow(1);
- SeaTunnelRow actual =
deserializationSchema.deserialize(serializedJson);
+ SeaTunnelRow actual = deser.deserialize(serializedJson);
assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testDeserializationMissingField() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("id", 123123123);
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
// fail on missing field
- deserializationSchema = new JsonDeserializationSchema(true, false,
schema);
+ final JsonDeserializationSchema deser = new
JsonDeserializationSchema(true, false, schema);
+
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError("Common", root.toString());
+ SeaTunnelRuntimeException actual =
+ assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ deser.deserialize(serializedJson);
+ },
+ "expecting exception message: " +
expected.getMessage());
+ assertEquals(actual.getMessage(), expected.getMessage());
+
+ SeaTunnelRuntimeException expectedCause =
+ CommonError.jsonOperationError("Common", "Field $.name in " +
root.toString());
+ Throwable cause = actual.getCause();
+ assertEquals(cause.getClass(), expectedCause.getClass());
+ assertEquals(cause.getMessage(), expectedCause.getMessage());
+ }
- String errorMessage =
- "ErrorCode:[COMMON-02], ErrorDescription:[Json covert/parse
operation failed] - Failed to deserialize JSON '{\"id\":123123123}'.";
- try {
- deserializationSchema.deserialize(serializedJson);
- fail("expecting exception message: " + errorMessage);
- } catch (Throwable t) {
- assertEquals(errorMessage, t.getMessage());
- }
+ @Test
+ public void testDeserializationIgnoreParseError() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("id", 123123123);
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
+ SeaTunnelRow expected = new SeaTunnelRow(1);
// ignore on parse error
- deserializationSchema = new JsonDeserializationSchema(false, true,
schema);
- assertEquals(expected,
deserializationSchema.deserialize(serializedJson));
+ final JsonDeserializationSchema deser = new
JsonDeserializationSchema(false, true, schema);
+ assertEquals(expected, deser.deserialize(serializedJson));
+ }
- errorMessage =
+ @Test
+ public void testDeserializationFailOnMissingFieldIgnoreParseError() throws
Exception {
+ String errorMessage =
"ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] -
JSON format doesn't support failOnMissingField and ignoreParseErrors are both
enabled.";
- try {
- // failOnMissingField and ignoreParseErrors both enabled
- new JsonDeserializationSchema(true, true, schema);
- Assertions.fail("expecting exception message: " + errorMessage);
- } catch (Throwable t) {
- assertEquals(errorMessage, t.getMessage());
- }
+
+ SeaTunnelJsonFormatException actual =
+ assertThrows(
+ SeaTunnelJsonFormatException.class,
+ () -> {
+ new JsonDeserializationSchema(true, true, null);
+ },
+ "expecting exception message: " + errorMessage);
+ assertEquals(actual.getMessage(), errorMessage);
+ }
+
+ @Test
+ public void testDeserializationNoJson() throws Exception {
+ SeaTunnelRowType schema =
+ new SeaTunnelRowType(new String[] {"name"}, new
SeaTunnelDataType[] {STRING_TYPE});
+
+ String noJson = "{]";
+ final JsonDeserializationSchema deser = new
JsonDeserializationSchema(false, false, schema);
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError("Common", noJson);
+
+ SeaTunnelRuntimeException actual =
+ assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ deser.deserialize(noJson);
+ },
+ "expecting exception message: " +
expected.getMessage());
+
+ assertEquals(actual.getMessage(), expected.getMessage());
+
+ actual =
+ assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ deser.deserialize(noJson.getBytes());
+ },
+ "expecting exception message: " +
expected.getMessage());
+
+ assertEquals(actual.getMessage(), expected.getMessage());
}
}
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 20088e525b..e74e58e19e 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -41,8 +43,10 @@ import static
org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class DebeziumJsonSerDeSchemaTest {
+ private static final String FORMAT = "Debezium";
private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
new SeaTunnelRowType(
@@ -65,6 +69,94 @@ public class DebeziumJsonSerDeSchemaTest {
testSerializationDeserialization("debezium-data.txt", false);
}
+ @Test
+ public void testDeserializeNoJson() throws Exception {
+ final DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
false);
+ final SimpleCollector collector = new SimpleCollector();
+
+ String noJsonMsg = "{]";
+
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noJsonMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noJsonMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeEmptyJson() throws Exception {
+ final DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
false);
+ final SimpleCollector collector = new SimpleCollector();
+ String emptyMsg = "{}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, emptyMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(emptyMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+ }
+
+ @Test
+ public void testDeserializeNoDataJson() throws Exception {
+ final DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
false);
+ final SimpleCollector collector = new SimpleCollector();
+ String noDataMsg = "{\"op\":\"u\"}";
+ SeaTunnelRuntimeException expected =
CommonError.jsonOperationError(FORMAT, noDataMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+
deserializationSchema.deserialize(noDataMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable noDataCause = cause.getCause();
+ assertEquals(noDataCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ noDataCause.getMessage(),
+ String.format(
+ "The \"before\" field of %s operation is null, "
+ + "if you are using Debezium Postgres
Connector, "
+ + "please check the Postgres table has been
set REPLICA IDENTITY to FULL level.",
+ "UPDATE"));
+ }
+
+ @Test
+ public void testDeserializeUnknownOperationTypeJson() throws Exception {
+ final DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE,
false);
+ final SimpleCollector collector = new SimpleCollector();
+ String unknownType = "XX";
+ String unknownOperationMsg =
+
"{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small
2-wheel scooter\",\"weight\":3.14},\"op\":\""
+ + unknownType
+ + "\"}";
+ SeaTunnelRuntimeException expected =
+ CommonError.jsonOperationError(FORMAT, unknownOperationMsg);
+ SeaTunnelRuntimeException cause =
+ assertThrows(
+ expected.getClass(),
+ () -> {
+ deserializationSchema.deserialize(
+ unknownOperationMsg.getBytes(), collector);
+ });
+ assertEquals(cause.getMessage(), expected.getMessage());
+
+ Throwable unknownTypeCause = cause.getCause();
+ assertEquals(unknownTypeCause.getClass(), IllegalStateException.class);
+ assertEquals(
+ unknownTypeCause.getMessage(),
+ String.format("Unknown operation type '%s'.", unknownType));
+ }
+
private void testSerializationDeserialization(String resourceFile, boolean
schemaInclude)
throws Exception {
List<String> lines = readLines(resourceFile);