This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f976d14f9 [Feature][connector][kafka] Supplement the test case in json
format (#3889)
f976d14f9 is described below
commit f976d14f9e3a1d68eee41fd3939bcfa63972dc3a
Author: monster <[email protected]>
AuthorDate: Tue Jan 10 14:43:35 2023 +0800
[Feature][connector][kafka] Supplement the test case in json format (#3889)
* [Feature][connector][kafka] Supplement the test case in json format
* fix
* fix
* fix
* fix
---
.../format/json/JsonRowDataSerDeSchemaTest.java | 299 +++++++++++++++++++++
1 file changed, 299 insertions(+)
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
new file mode 100644
index 000000000..c605048e9
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json;
+
+import static org.apache.seatunnel.api.table.type.ArrayType.INT_ARRAY_TYPE;
+import static org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonRowDataSerDeSchemaTest {
+
+ @Test
+ public void testSerDe() throws Exception {
+ int intValue = 45536;
+ float floatValue = 33.333F;
+ long longValue = 1238123899121L;
+ String name = "asdlkjasjkdla998y1122";
+ LocalDate date = LocalDate.parse("1990-10-14");
+ LocalTime time = LocalTime.parse("12:12:43");
+ Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
+ Timestamp timestamp9 = Timestamp.valueOf("1990-10-14
12:12:43.123456789");
+ Map<String, Long> map = new HashMap<>();
+ map.put("element", 123L);
+
+ Map<String, Integer> multiSet = new HashMap<>();
+ multiSet.put("element", 2);
+
+ Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+ Map<String, Integer> innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("bool", true);
+ root.put("int", intValue);
+ root.put("longValue", longValue);
+ root.put("float", floatValue);
+ root.put("name", name);
+ root.put("date", "1990-10-14");
+ root.put("time", "12:12:43");
+ root.put("timestamp3", "1990-10-14T12:12:43.123");
+ root.put("timestamp9", "1990-10-14T12:12:43.123456789");
+ root.putObject("map").put("element", 123);
+ root.putObject("multiSet").put("element", 2);
+ root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ SeaTunnelRowType schema = new SeaTunnelRowType(
+ new String[]{"bool", "int",
+ "longValue", "float", "name", "date", "time",
"timestamp3", "timestamp9", "map", "multiSet", "map2map"}, new
SeaTunnelDataType[]{
+ BOOLEAN_TYPE,
+ INT_TYPE,
+ LONG_TYPE,
+ FLOAT_TYPE,
+ STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ new MapType(STRING_TYPE, LONG_TYPE),
+ new MapType(STRING_TYPE, INT_TYPE),
+ new MapType(STRING_TYPE, new MapType(STRING_TYPE,
INT_TYPE))}
+ );
+
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(false, false, schema);
+
+ SeaTunnelRow expected = new SeaTunnelRow(12);
+ expected.setField(0, true);
+ expected.setField(1, intValue);
+ expected.setField(2, longValue);
+ expected.setField(3, floatValue);
+ expected.setField(4, name);
+ expected.setField(5, date);
+ expected.setField(6, time);
+ expected.setField(7, timestamp3.toLocalDateTime());
+ expected.setField(8, timestamp9.toLocalDateTime());
+ expected.setField(9, map);
+ expected.setField(10, multiSet);
+ expected.setField(11, nestedMap);
+
+ SeaTunnelRow seaTunnelRow =
deserializationSchema.deserialize(serializedJson);
+ assertEquals(expected, seaTunnelRow);
+
+ // test serialization
+ JsonSerializationSchema serializationSchema =
+ new JsonSerializationSchema(schema);
+
+ byte[] actualBytes = serializationSchema.serialize(seaTunnelRow);
+ assertEquals(new String(serializedJson), new String(actualBytes));
+ }
+
+ @Test
+ public void testSerDeMultiRows() throws Exception {
+ SeaTunnelRowType schema = new SeaTunnelRowType(new String[]{"f1",
"f2", "f3", "f4", "f5", "f6"},
+ new SeaTunnelDataType[]{INT_TYPE,
+ BOOLEAN_TYPE,
+ STRING_TYPE,
+ new MapType(STRING_TYPE, STRING_TYPE),
+ STRING_ARRAY_TYPE,
+ new SeaTunnelRowType(new String[]{"f1", "f2"}, new
SeaTunnelDataType[]{STRING_TYPE, INT_TYPE})});
+
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(false, false, schema);
+ JsonSerializationSchema serializationSchema =
+ new JsonSerializationSchema(schema);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // the first row
+ {
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("f1", 1);
+ root.put("f2", true);
+ root.put("f3", "str");
+ ObjectNode map = root.putObject("f4");
+ map.put("hello1", "flink");
+ ArrayNode array = root.putArray("f5");
+ array.add("element1");
+ array.add("element2");
+ ObjectNode row = root.putObject("f6");
+ row.put("f1", "this is row1");
+ row.put("f2", 12);
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ SeaTunnelRow rowData =
deserializationSchema.deserialize(serializedJson);
+ byte[] actual = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actual));
+ }
+
+ // the second row
+ {
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("f1", 10);
+ root.put("f2", false);
+ root.put("f3", "newStr");
+ ObjectNode map = root.putObject("f4");
+ map.put("hello2", "json");
+ ArrayNode array = root.putArray("f5");
+ array.add("element3");
+ array.add("element4");
+ ObjectNode row = root.putObject("f6");
+ row.put("f1", "this is row2");
+ row.putNull("f2");
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ SeaTunnelRow rowData =
deserializationSchema.deserialize(serializedJson);
+ byte[] actual = serializationSchema.serialize(rowData);
+ assertEquals(new String(serializedJson), new String(actual));
+ }
+ }
+
+ @Test
+ public void testSerDeMultiRowsWithNullValues() throws Exception {
+ String[] jsons =
+ new String[] {
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{\"k1\":10.01,\"k2\":\"invalid\"}}",
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
+ + "\"ids\":[1,2,3]}",
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"metrics\":{}}",
+ };
+
+ String[] expected =
+ new String[] {
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
+ + "\"ids\":[1,2,3],\"metrics\":null}",
+
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null,\"metrics\":{}}",
+ };
+
+ SeaTunnelRowType rowType = new SeaTunnelRowType(new String[]{"svt",
"ops", "ids", "metrics"},
+ new SeaTunnelDataType[]{STRING_TYPE,
+ new SeaTunnelRowType(new String[]{"id"}, new
SeaTunnelDataType[]{STRING_TYPE}),
+ INT_ARRAY_TYPE,
+ new MapType(STRING_TYPE, DOUBLE_TYPE)});
+
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(false, true, rowType);
+ JsonSerializationSchema serializationSchema =
+ new JsonSerializationSchema(rowType);
+
+ for (int i = 0; i < jsons.length; i++) {
+ String json = jsons[i];
+ SeaTunnelRow row =
deserializationSchema.deserialize(json.getBytes());
+ String result = new String(serializationSchema.serialize(row));
+ assertEquals(expected[i], result);
+ }
+ }
+
+ @Test
+ public void testDeserializationNullRow() throws Exception {
+ SeaTunnelRowType schema = new SeaTunnelRowType(new String[]{"name"},
new SeaTunnelDataType[]{STRING_TYPE});
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(true, false, schema);
+
+ assertNull(deserializationSchema.deserialize(null));
+ }
+
+ @Test
+ public void testDeserializationMissingNode() throws Exception {
+ SeaTunnelRowType schema = new SeaTunnelRowType(new String[]{"name"},
new SeaTunnelDataType[]{STRING_TYPE});
+
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(true, false, schema);
+ SeaTunnelRow rowData =
deserializationSchema.deserialize("".getBytes());
+ assertEquals(null, rowData);
+ }
+
+ @Test
+ public void testDeserializationMissingField() throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("id", 123123123);
+ byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+ SeaTunnelRowType schema = new SeaTunnelRowType(new String[]{"name"},
new SeaTunnelDataType[]{STRING_TYPE});
+
+ // pass on missing field
+ JsonDeserializationSchema deserializationSchema =
+ new JsonDeserializationSchema(false, false, schema);
+
+ SeaTunnelRow expected = new SeaTunnelRow(1);
+ SeaTunnelRow actual =
deserializationSchema.deserialize(serializedJson);
+ assertEquals(expected, actual);
+
+ // fail on missing field
+ deserializationSchema =
+ new JsonDeserializationSchema(true, false, schema);
+
+ String errorMessage = "ErrorCode:[COMMON-02], ErrorDescription:[Json
covert/parse operation failed] - Failed to deserialize JSON
'{\"id\":123123123}'.";
+ try {
+ deserializationSchema.deserialize(serializedJson);
+ fail("expecting exception message: " + errorMessage);
+ } catch (Throwable t) {
+ assertEquals(errorMessage, t.getMessage());
+ }
+
+ // ignore on parse error
+ deserializationSchema =
+ new JsonDeserializationSchema(false, true, schema);
+ assertEquals(expected,
deserializationSchema.deserialize(serializedJson));
+
+ errorMessage =
+ "ErrorCode:[COMMON-06], ErrorDescription:[Illegal argument] -
JSON format doesn't support failOnMissingField and ignoreParseErrors are both
enabled.";
+ try {
+ // failOnMissingField and ignoreParseErrors both enabled
+ new JsonDeserializationSchema(true, true, schema);
+ Assertions.fail("expecting exception message: " + errorMessage);
+ } catch (Throwable t) {
+ assertEquals(errorMessage, t.getMessage());
+ }
+ }
+
+}