This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6f8709456 [Bug][ConsoleSinkV2]fix fieldToString StackOverflow and add
Unit-Test (#2545)
6f8709456 is described below
commit 6f87094569f2f1ac614d7661fc953d06a54871f0
Author: Laglangyue <[email protected]>
AuthorDate: Wed Aug 31 12:10:15 2022 +0800
[Bug][ConsoleSinkV2]fix fieldToString StackOverflow and add Unit-Test
(#2545)
* [ConsoleSinkV2]fix fieldToString StackOverflow and add Unit-Test
* add bytes type test
* update unit to ensure convert correctly
* update unit to ensure convert correctly
* update unit to ensure convert correctly
---
.../seatunnel/console/sink/ConsoleSinkWriter.java | 17 +++-
.../console/sink/ConsoleSinkWriterIT.java | 109 +++++++++++++++++++++
.../seatunnel/fake/source/FakeRandomData.java | 2 +-
3 files changed, 124 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 921aef06c..3a5b334f8 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -25,7 +25,9 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.commons.lang3.StringUtils;
-import java.util.Arrays;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -60,11 +62,20 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
switch (type.getSqlType()) {
case ARRAY:
case BYTES:
- return Arrays.toString((Object[]) value);
+ List<String> arrayData = new ArrayList<>();
+ for (int i = 0; i < Array.getLength(value); i++) {
+ arrayData.add(String.valueOf(Array.get(value, i)));
+ }
+ return arrayData.toString();
case MAP:
return JsonUtils.toJsonString(value);
case ROW:
- return fieldToString(type, value);
+ List<String> rowData = new ArrayList<>();
+ SeaTunnelRowType rowType = (SeaTunnelRowType) type;
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ rowData.add(fieldToString(rowType.getFieldTypes()[i],
Array.get(value, i)));
+ }
+ return rowData.toString();
default:
return String.valueOf(value);
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
new file mode 100644
index 000000000..47735bbbb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class ConsoleSinkWriterIT {
+
+ private ConsoleSinkWriter consoleSinkWriter;
+
+ @BeforeEach
+ void setUp() {
+ String[] fieldNames = {};
+ SeaTunnelDataType<?>[] fieldTypes = {};
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
+ consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType);
+ }
+
+ private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object
value) {
+ Optional<Method> fieldToString =
ReflectionUtils.getDeclaredMethod(ConsoleSinkWriter.class, "fieldToString",
SeaTunnelDataType.class, Object.class);
+ Method method = fieldToString.orElseThrow(() -> new
RuntimeException("method fieldToString not found"));
+ try {
+ return method.invoke(consoleSinkWriter, dataType, value);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void arrayIntTest() {
+ Assertions.assertDoesNotThrow(() -> {
+ Integer[] integerArr = {1};
+ Object integerArrString =
fieldToStringTest(ArrayType.INT_ARRAY_TYPE, integerArr);
+ Assertions.assertEquals(integerArrString, "[1]");
+ int[] intArr = {1, 2};
+ Object intArrString = fieldToStringTest(ArrayType.INT_ARRAY_TYPE,
intArr);
+ Assertions.assertEquals(intArrString, "[1, 2]");
+ });
+ }
+
+ @Test
+ void stringTest() {
+ Assertions.assertDoesNotThrow(() -> {
+ String str = RandomStringUtils.randomAlphanumeric(10);
+ Object obj = fieldToStringTest(BasicType.STRING_TYPE, str);
+ Assertions.assertTrue(obj instanceof String);
+ Assertions.assertEquals(10, ((String) obj).length());
+ });
+ }
+
+ @Test
+ void hashMapTest() {
+ Assertions.assertDoesNotThrow(() -> {
+ HashMap<Object, Object> map = new HashMap<>();
+ map.put("key", "value");
+ MapType<String, String> mapType = new
MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+ Object mapString = fieldToStringTest(mapType, map);
+ Assertions.assertNotNull(mapString);
+ Assertions.assertEquals("{\"key\":\"value\"}", mapString);
+ });
+ }
+
+ @Test
+ void rowTypeTest() {
+ Assertions.assertDoesNotThrow(() -> {
+ String[] fieldNames = {"c_byte", "c_array", "bytes"};
+ SeaTunnelDataType<?>[] fieldTypes = {BasicType.BYTE_TYPE,
ArrayType.BYTE_ARRAY_TYPE, PrimitiveByteArrayType.INSTANCE};
+ SeaTunnelRowType seaTunnelRowType = new
SeaTunnelRowType(fieldNames, fieldTypes);
+ byte[] bytes = RandomUtils.nextBytes(10);
+ Object[] rowData = {(byte) 1, bytes, bytes};
+ Object rowString = fieldToStringTest(seaTunnelRowType, rowData);
+ Assertions.assertNotNull(rowString);
+ Assertions.assertEquals(String.format("[1, %s, %s]",
Arrays.toString(bytes), Arrays.toString(bytes)), rowString.toString());
+ });
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
index a44c05e88..a0dfa4111 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
@@ -112,7 +112,7 @@ public class FakeRandomData {
objectObjectHashMap.put(key, value);
return objectObjectHashMap;
} else if (fieldType instanceof PrimitiveByteArrayType) {
- return RandomUtils.nextBytes(100);
+ return RandomUtils.nextBytes(3);
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
return Void.TYPE;
} else {