This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6f8709456 [Bug][ConsoleSinkV2]fix fieldToString StackOverflow and add 
Unit-Test (#2545)
6f8709456 is described below

commit 6f87094569f2f1ac614d7661fc953d06a54871f0
Author: Laglangyue <[email protected]>
AuthorDate: Wed Aug 31 12:10:15 2022 +0800

    [Bug][ConsoleSinkV2]fix fieldToString StackOverflow and add Unit-Test 
(#2545)
    
    * [ConsoleSinkV2]fix fieldToString StackOverflow and add Unit-Test
    
    * add bytes type test
    
    * update unit to ensure convert correctly
    
    * update unit to ensure convert correctly
    
    * update unit to ensure convert correctly
---
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |  17 +++-
 .../console/sink/ConsoleSinkWriterIT.java          | 109 +++++++++++++++++++++
 .../seatunnel/fake/source/FakeRandomData.java      |   2 +-
 3 files changed, 124 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 921aef06c..3a5b334f8 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -25,7 +25,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.Arrays;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -60,11 +62,20 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         switch (type.getSqlType()) {
             case ARRAY:
             case BYTES:
-                return Arrays.toString((Object[]) value);
+                List<String> arrayData = new ArrayList<>();
+                for (int i = 0; i < Array.getLength(value); i++) {
+                    arrayData.add(String.valueOf(Array.get(value, i)));
+                }
+                return arrayData.toString();
             case MAP:
                 return JsonUtils.toJsonString(value);
             case ROW:
-                return fieldToString(type, value);
+                List<String> rowData = new ArrayList<>();
+                SeaTunnelRowType rowType = (SeaTunnelRowType) type;
+                for (int i = 0; i < rowType.getTotalFields(); i++) {
+                    rowData.add(fieldToString(rowType.getFieldTypes()[i], 
Array.get(value, i)));
+                }
+                return rowData.toString();
             default:
                 return String.valueOf(value);
         }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
 
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
new file mode 100644
index 000000000..47735bbbb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.console.sink;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class ConsoleSinkWriterIT {
+
+    private ConsoleSinkWriter consoleSinkWriter;
+
+    @BeforeEach
+    void setUp() {
+        String[] fieldNames = {};
+        SeaTunnelDataType<?>[] fieldTypes = {};
+        SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
+        consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType);
+    }
+
+    private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object 
value) {
+        Optional<Method> fieldToString = 
ReflectionUtils.getDeclaredMethod(ConsoleSinkWriter.class, "fieldToString", 
SeaTunnelDataType.class, Object.class);
+        Method method = fieldToString.orElseThrow(() -> new 
RuntimeException("method fieldToString not found"));
+        try {
+            return method.invoke(consoleSinkWriter, dataType, value);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    void arrayIntTest() {
+        Assertions.assertDoesNotThrow(() -> {
+            Integer[] integerArr = {1};
+            Object integerArrString = 
fieldToStringTest(ArrayType.INT_ARRAY_TYPE, integerArr);
+            Assertions.assertEquals(integerArrString, "[1]");
+            int[] intArr = {1, 2};
+            Object intArrString = fieldToStringTest(ArrayType.INT_ARRAY_TYPE, 
intArr);
+            Assertions.assertEquals(intArrString, "[1, 2]");
+        });
+    }
+
+    @Test
+    void stringTest() {
+        Assertions.assertDoesNotThrow(() -> {
+            String str = RandomStringUtils.randomAlphanumeric(10);
+            Object obj = fieldToStringTest(BasicType.STRING_TYPE, str);
+            Assertions.assertTrue(obj instanceof String);
+            Assertions.assertEquals(10, ((String) obj).length());
+        });
+    }
+
+    @Test
+    void hashMapTest() {
+        Assertions.assertDoesNotThrow(() -> {
+            HashMap<Object, Object> map = new HashMap<>();
+            map.put("key", "value");
+            MapType<String, String> mapType = new 
MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+            Object mapString = fieldToStringTest(mapType, map);
+            Assertions.assertNotNull(mapString);
+            Assertions.assertEquals("{\"key\":\"value\"}", mapString);
+        });
+    }
+
+    @Test
+    void rowTypeTest() {
+        Assertions.assertDoesNotThrow(() -> {
+            String[] fieldNames = {"c_byte", "c_array", "bytes"};
+            SeaTunnelDataType<?>[] fieldTypes = {BasicType.BYTE_TYPE, 
ArrayType.BYTE_ARRAY_TYPE, PrimitiveByteArrayType.INSTANCE};
+            SeaTunnelRowType seaTunnelRowType = new 
SeaTunnelRowType(fieldNames, fieldTypes);
+            byte[] bytes = RandomUtils.nextBytes(10);
+            Object[] rowData = {(byte) 1, bytes, bytes};
+            Object rowString = fieldToStringTest(seaTunnelRowType, rowData);
+            Assertions.assertNotNull(rowString);
+            Assertions.assertEquals(String.format("[1, %s, %s]", 
Arrays.toString(bytes), Arrays.toString(bytes)), rowString.toString());
+        });
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
index a44c05e88..a0dfa4111 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
@@ -112,7 +112,7 @@ public class FakeRandomData {
             objectObjectHashMap.put(key, value);
             return objectObjectHashMap;
         } else if (fieldType instanceof PrimitiveByteArrayType) {
-            return RandomUtils.nextBytes(100);
+            return RandomUtils.nextBytes(3);
         } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
             return Void.TYPE;
         } else {

Reply via email to