This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 044f62ef3 [Improve][Connector-V2] Improve fake source connector (#2944)
044f62ef3 is described below

commit 044f62ef32cd0d195bc4bb6c8492d48d15379ff4
Author: TyrantLucifer <[email protected]>
AuthorDate: Fri Sep 30 18:23:26 2022 +0800

    [Improve][Connector-V2] Improve fake source connector (#2944)
    
    * [Improve][Connector-V2] Improve fake source connector
    
    * [Improve][Connector-V2] Update doc
    
    * [Improve][Connector-V2] Fix e2e test cases
    
    * [Improve][Connector-V2] Fix integration error.
    
    * Revert "[Improve][Connector-V2] Fix integration error."
    
    This reverts commit c840c696ceef27bfa2360e08efc626a77a26316a.
    
    * [Improve][Connector-V2] Fix flink e2e test cases
    
    * [Improve][Connector-V2] Optimize class name
---
 docs/en/connector-v2/source/FakeSource.md          | 111 +++++++++++-----
 .../seatunnel/fake/source/FakeConfig.java          |  70 ++++++++++
 .../seatunnel/fake/source/FakeDataGenerator.java   | 147 +++++++++++++++++++++
 .../seatunnel/fake/source/FakeOptions.java         |  40 ------
 .../seatunnel/fake/source/FakeRandomData.java      | 133 -------------------
 .../seatunnel/fake/source/FakeSource.java          |  10 +-
 .../seatunnel/fake/source/FakeSourceReader.java    |  14 +-
 ...domDataTest.java => FakeDataGeneratorTest.java} |  62 ++++-----
 .../src/test/resources/complex.schema.conf         |  63 ++++++---
 .../src/test/resources/simple.schema.conf          |  43 +++---
 .../resources/assertion/fakesource_to_assert.conf  |   4 +-
 .../resources/assertion/fakesource_to_assert.conf  |   2 +-
 12 files changed, 412 insertions(+), 287 deletions(-)

diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index c4bc5057c..ce8aef406 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -5,7 +5,7 @@
 ## Description
 
 The FakeSource is a virtual data source, which randomly generates the number 
of rows according to the data structure of the user-defined schema,
-just for testing, such as type conversion and feature testing
+just for some test cases such as type conversion or connector new feature 
testing
 
 ## Key features
 
@@ -18,30 +18,43 @@ just for testing, such as type conversion and feature 
testing
 
 ## Options
 
-| name              | type   | required | default value |
-|-------------------|--------|----------|---------------|
-| result_table_name | string | yes      | -             |
-| schema            | config | yes      | -             |
-| row.num           | long   | no       | 10            |
+| name          | type   | required | default value |
+|---------------|--------|----------|---------------|
+| schema        | config | yes      | -             |
+| row.num       | int    | no       | 5             |
+| map.size      | int    | no       | 5             |
+| array.size    | int    | no       | 5             |
+| bytes.length  | int    | no       | 5             |
+| string.length | int    | no       | 5             |
 
-### result_table_name [string]
 
-The table name.
+### schema [config]
 
-### type [string]
+The schema of fake data that you want to generate
 
-Table structure description ,you should assign schema option to tell connector 
how to parse data to the row you want.  
-**Tips**: Most of Unstructured-Datasource contain this param, such as 
LocalFile,HdfsFile.  
-**Example**:
-
-### row.num
-Number of additional rows of generated data
+For example:
 
 ```hocon
-schema = {
-      fields {
-        c_map = "map<string, string>"
-        c_array = "array<tinyint>"
+  schema = {
+    fields {
+      c_map = "map<string, array<int>>"
+      c_array = "array<int>"
+      c_string = string
+      c_boolean = boolean
+      c_tinyint = tinyint
+      c_smallint = smallint
+      c_int = int
+      c_bigint = bigint
+      c_float = float
+      c_double = double
+      c_decimal = "decimal(30, 8)"
+      c_null = "null"
+      c_bytes = bytes
+      c_date = date
+      c_timestamp = timestamp
+      c_row = {
+        c_map = "map<string, map<string, string>>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
         c_tinyint = tinyint
@@ -54,23 +67,61 @@ schema = {
         c_null = "null"
         c_bytes = bytes
         c_date = date
-        c_time = time
         c_timestamp = timestamp
       }
     }
+  }
 ```
 
-## Example
+### row.num
+
+Total num of data that connector generated
+
+### map.size
+
+The size of `map` type that connector generated
 
-Simple source for FakeSource which contains enough datatype
+### array.size
+
+The size of `array` type that connector generated
+
+### bytes.length
+
+The length of `bytes` type that connector generated
+
+### string.length
+
+The length of `string` type that connector generated
+
+## Example
 
 ```hocon
-source {
-  FakeSource {
-    schema = {
-      fields {
-        c_map = "map<string, string>"
-        c_array = "array<tinyint>"
+FakeSource {
+  row.num = 10
+  map.size = 10
+  array.size = 10
+  bytes.length = 10
+  string.length = 10
+  schema = {
+    fields {
+      c_map = "map<string, array<int>>"
+      c_array = "array<int>"
+      c_string = string
+      c_boolean = boolean
+      c_tinyint = tinyint
+      c_smallint = smallint
+      c_int = int
+      c_bigint = bigint
+      c_float = float
+      c_double = double
+      c_decimal = "decimal(30, 8)"
+      c_null = "null"
+      c_bytes = bytes
+      c_date = date
+      c_timestamp = timestamp
+      c_row = {
+        c_map = "map<string, map<string, string>>"
+        c_array = "array<int>"
         c_string = string
         c_boolean = boolean
         c_tinyint = tinyint
@@ -83,11 +134,9 @@ source {
         c_null = "null"
         c_bytes = bytes
         c_date = date
-        c_time = time
         c_timestamp = timestamp
       }
     }
-    result_table_name = "fake"
   }
 }
-```
\ No newline at end of file
+```
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
new file mode 100644
index 000000000..8d4fe1d77
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Builder
+@Getter
+public class FakeConfig implements Serializable {
+    private static final String ROW_NUM = "row.num";
+    private static final String MAP_SIZE = "map.size";
+    private static final String ARRAY_SIZE = "array.size";
+    private static final String BYTES_LENGTH = "bytes.length";
+    private static final String STRING_LENGTH = "string.length";
+    private static final int DEFAULT_ROW_NUM = 5;
+    private static final int DEFAULT_MAP_SIZE = 5;
+    private static final int DEFAULT_ARRAY_SIZE = 5;
+    private static final int DEFAULT_BYTES_LENGTH = 5;
+    private static final int DEFAULT_STRING_LENGTH = 5;
+    @Builder.Default
+    private int rowNum = DEFAULT_ROW_NUM;
+    @Builder.Default
+    private int mapSize = DEFAULT_MAP_SIZE;
+    @Builder.Default
+    private int arraySize = DEFAULT_ARRAY_SIZE;
+    @Builder.Default
+    private int bytesLength = DEFAULT_BYTES_LENGTH;
+    @Builder.Default
+    private int stringLength = DEFAULT_STRING_LENGTH;
+
+    public static FakeConfig buildWithConfig(Config config) {
+        FakeConfigBuilder builder = FakeConfig.builder();
+        if (config.hasPath(ROW_NUM)) {
+            builder.rowNum(config.getInt(ROW_NUM));
+        }
+        if (config.hasPath(MAP_SIZE)) {
+            builder.mapSize(config.getInt(MAP_SIZE));
+        }
+        if (config.hasPath(ARRAY_SIZE)) {
+            builder.arraySize(config.getInt(ARRAY_SIZE));
+        }
+        if (config.hasPath(BYTES_LENGTH)) {
+            builder.bytesLength(config.getInt(BYTES_LENGTH));
+        }
+        if (config.hasPath(STRING_LENGTH)) {
+            builder.stringLength(config.getInt(STRING_LENGTH));
+        }
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
new file mode 100644
index 000000000..3adfb885a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class FakeDataGenerator {
+    public static final String SCHEMA = "schema";
+    private final SeaTunnelSchema schema;
+    private final FakeConfig fakeConfig;
+
+    public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) {
+        this.schema = schema;
+        this.fakeConfig = fakeConfig;
+    }
+
+    private SeaTunnelRow randomRow() {
+        SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        List<Object> randomRow = new ArrayList<>(fieldNames.length);
+        for (SeaTunnelDataType<?> fieldType : fieldTypes) {
+            randomRow.add(randomColumnValue(fieldType));
+        }
+        return new SeaTunnelRow(randomRow.toArray());
+    }
+
+    public List<SeaTunnelRow> generateFakedRows() {
+        ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
+        for (int i = 0; i < fakeConfig.getRowNum(); i++) {
+            seaTunnelRows.add(randomRow());
+        }
+        return seaTunnelRows;
+    }
+
+    @SuppressWarnings("magicnumber")
+    private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+        switch (fieldType.getSqlType()) {
+            case ARRAY:
+                ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+                BasicType<?> elementType = arrayType.getElementType();
+                int length = fakeConfig.getArraySize();
+                Object array = Array.newInstance(elementType.getTypeClass(), 
length);
+                for (int i = 0; i < length; i++) {
+                    Object value = randomColumnValue(elementType);
+                    Array.set(array, i, value);
+                }
+                return array;
+            case MAP:
+                MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
+                SeaTunnelDataType<?> keyType = mapType.getKeyType();
+                SeaTunnelDataType<?> valueType = mapType.getValueType();
+                HashMap<Object, Object> objectMap = new HashMap<>();
+                int mapSize = fakeConfig.getMapSize();
+                for (int i = 0; i < mapSize; i++) {
+                    Object key = randomColumnValue(keyType);
+                    Object value = randomColumnValue(valueType);
+                    objectMap.put(key, value);
+                }
+                return objectMap;
+            case STRING:
+                return 
RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
+            case BOOLEAN:
+                return RandomUtils.nextInt(0, 2) == 1;
+            case TINYINT:
+                return (byte) RandomUtils.nextInt(0, 255);
+            case SMALLINT:
+                return (short) RandomUtils.nextInt(Byte.MAX_VALUE, 
Short.MAX_VALUE);
+            case INT:
+                return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
+            case BIGINT:
+                return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
+            case FLOAT:
+                return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
+            case DOUBLE:
+                return RandomUtils.nextDouble(Float.MAX_VALUE, 
Double.MAX_VALUE);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) fieldType;
+                return new 
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - 
decimalType.getScale()) + "." +
+                        
RandomStringUtils.randomNumeric(decimalType.getScale()));
+            case NULL:
+                return null;
+            case BYTES:
+                return RandomUtils.nextBytes(fakeConfig.getBytesLength());
+            case DATE:
+                return randomLocalDateTime().toLocalDate();
+            case TIME:
+                return randomLocalDateTime().toLocalTime();
+            case TIMESTAMP:
+                return randomLocalDateTime();
+            case ROW:
+                SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) 
fieldType).getFieldTypes();
+                Object[] objects = new Object[fieldTypes.length];
+                for (int i = 0; i < fieldTypes.length; i++) {
+                    Object object = randomColumnValue(fieldTypes[i]);
+                    objects[i] = object;
+                }
+                return new SeaTunnelRow(objects);
+            default:
+                // never got in there
+                throw new UnsupportedOperationException("SeaTunnel Fake source 
connector not support this data type");
+        }
+    }
+
+    @SuppressWarnings("magicnumber")
+    private LocalDateTime randomLocalDateTime() {
+        return LocalDateTime.of(
+            LocalDateTime.now().getYear(),
+            RandomUtils.nextInt(1, 12),
+            RandomUtils.nextInt(1, 28),
+            RandomUtils.nextInt(0, 24),
+            RandomUtils.nextInt(0, 59)
+        );
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
deleted file mode 100644
index 96dc5e8ac..000000000
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.io.Serializable;
-
-public class FakeOptions implements Serializable {
-
-    private static final String ROW_NUM = "row.num";
-    private static final Long DEFAULT_ROW_NUM = 10L;
-    @Getter
-    @Setter
-    private Long rowNum;
-
-    public static FakeOptions parse(Config config) {
-        FakeOptions fakeOptions = new FakeOptions();
-        fakeOptions.setRowNum(config.hasPath(ROW_NUM) ? 
config.getLong(ROW_NUM) : DEFAULT_ROW_NUM);
-        return fakeOptions;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
deleted file mode 100644
index 3ea562e4c..000000000
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
-import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class FakeRandomData {
-    public static final String SCHEMA = "schema";
-    private final SeaTunnelSchema schema;
-
-    public FakeRandomData(SeaTunnelSchema schema) {
-        this.schema = schema;
-    }
-
-    public SeaTunnelRow randomRow() {
-        SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
-        String[] fieldNames = seaTunnelRowType.getFieldNames();
-        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
-        List<Object> randomRow = new ArrayList<>(fieldNames.length);
-        for (SeaTunnelDataType<?> fieldType : fieldTypes) {
-            randomRow.add(randomColumnValue(fieldType));
-        }
-        return new SeaTunnelRow(randomRow.toArray());
-    }
-
-    @SuppressWarnings("magicnumber")
-    private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
-        if (BOOLEAN_TYPE.equals(fieldType)) {
-            return RandomUtils.nextInt(0, 2) == 1;
-        } else if (BYTE_TYPE.equals(fieldType)) {
-            return (byte) RandomUtils.nextInt(0, 255);
-        } else if (SHORT_TYPE.equals(fieldType)) {
-            return (short) RandomUtils.nextInt(Byte.MAX_VALUE, 
Short.MAX_VALUE);
-        } else if (INT_TYPE.equals(fieldType)) {
-            return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
-        } else if (LONG_TYPE.equals(fieldType)) {
-            return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
-        } else if (FLOAT_TYPE.equals(fieldType)) {
-            return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
-        } else if (DOUBLE_TYPE.equals(fieldType)) {
-            return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
-        } else if (STRING_TYPE.equals(fieldType)) {
-            return RandomStringUtils.randomAlphabetic(10);
-        } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
-            return randomLocalDateTime().toLocalDate();
-        } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
-            return randomLocalDateTime().toLocalTime();
-        } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
-            return randomLocalDateTime();
-        } else if (fieldType instanceof DecimalType) {
-            DecimalType decimalType = (DecimalType) fieldType;
-            return new 
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - 
decimalType.getScale()) + "." +
-                RandomStringUtils.randomNumeric(decimalType.getScale()));
-        } else if (fieldType instanceof ArrayType) {
-            ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
-            BasicType<?> elementType = arrayType.getElementType();
-            Object value = randomColumnValue(elementType);
-            Object arr = Array.newInstance(elementType.getTypeClass(), 1);
-            Array.set(arr, 0, value);
-            return arr;
-        } else if (fieldType instanceof MapType) {
-            MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
-            SeaTunnelDataType<?> keyType = mapType.getKeyType();
-            Object key = randomColumnValue(keyType);
-            SeaTunnelDataType<?> valueType = mapType.getValueType();
-            Object value = randomColumnValue(valueType);
-            HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
-            objectObjectHashMap.put(key, value);
-            return objectObjectHashMap;
-        } else if (fieldType instanceof PrimitiveByteArrayType) {
-            return RandomUtils.nextBytes(3);
-        } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
-            return null;
-        } else {
-            throw new UnsupportedOperationException("Unexpected value: " + 
fieldType);
-        }
-    }
-
-    @SuppressWarnings("magicnumber")
-    private LocalDateTime randomLocalDateTime() {
-        return LocalDateTime.of(
-            LocalDateTime.now().getYear(),
-            RandomUtils.nextInt(1, 12),
-            RandomUtils.nextInt(1, 28),
-            RandomUtils.nextInt(0, 24),
-            RandomUtils.nextInt(0, 59)
-        );
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index eeede4439..5e22ce5fc 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -38,7 +38,7 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     private Config pluginConfig;
     private JobContext jobContext;
     private SeaTunnelSchema schema;
-    private FakeOptions fakeOptions;
+    private FakeConfig fakeConfig;
 
     @Override
     public Boundedness getBoundedness() {
@@ -52,7 +52,7 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new FakeSourceReader(readerContext, new FakeRandomData(schema), 
fakeOptions);
+        return new FakeSourceReader(readerContext, new 
FakeDataGenerator(schema, fakeConfig));
     }
 
     @Override
@@ -63,9 +63,9 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     @Override
     public void prepare(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
-        assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
-        this.schema = 
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
-        this.fakeOptions = FakeOptions.parse(pluginConfig);
+        assert pluginConfig.hasPath(FakeDataGenerator.SCHEMA);
+        this.schema = 
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeDataGenerator.SCHEMA));
+        this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 6301a284f..b847838c0 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -26,19 +26,19 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FakeSourceReader.class);
 
     private final SingleSplitReaderContext context;
 
-    private final FakeRandomData fakeRandomData;
-    private final FakeOptions options;
+    private final FakeDataGenerator fakeDataGenerator;
 
-    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData 
randomData, FakeOptions options) {
+    public FakeSourceReader(SingleSplitReaderContext context, 
FakeDataGenerator randomData) {
         this.context = context;
-        this.fakeRandomData = randomData;
-        this.options = options;
+        this.fakeDataGenerator = randomData;
     }
 
     @Override
@@ -55,8 +55,8 @@ public class FakeSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
         // Generate a random number of rows to emit.
-        for (int i = 0; i < options.getRowNum(); i++) {
-            SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
+        List<SeaTunnelRow> seaTunnelRows = 
fakeDataGenerator.generateFakedRows();
+        for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
             output.collect(seaTunnelRow);
         }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
similarity index 55%
rename from 
seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
rename to 
seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 520718b1b..bc4d991b6 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,40 +30,44 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.lang.reflect.Array;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.Map;
 
-public class FakeRandomDataTest {
+public class FakeDataGeneratorTest {
 
     @ParameterizedTest
     @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
     public void testComplexSchemaParse(String conf) throws 
FileNotFoundException, URISyntaxException {
-        Config testConfigFile = getTestConfigFile(conf);
-        SeaTunnelSchema seatunnelSchema = 
SeaTunnelSchema.buildWithConfig(testConfigFile);
-        FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema);
-        SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
-        Assertions.assertNotNull(seaTunnelRow);
-        Object[] fields = seaTunnelRow.getFields();
-        Assertions.assertNotNull(fields);
-        SeaTunnelRowType seaTunnelRowType = 
seatunnelSchema.getSeaTunnelRowType();
-        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
-        for (int i = 0; i < fieldTypes.length; i++) {
-            if (fieldTypes[i].getSqlType() != SqlType.NULL) {
-                Assertions.assertNotNull(fields[i]);
-            } else {
-                Assertions.assertSame(fields[i], null);
-            }
-            if (fieldTypes[i].getSqlType() == SqlType.MAP) {
-                Assertions.assertTrue(fields[i] instanceof Map);
-                Map<?, ?> field = (Map) fields[i];
-                field.forEach((k, v) -> Assertions.assertTrue(k != null && v 
!= null));
-            }
-            if (fieldTypes[i].getSqlType() == SqlType.ARRAY) {
-                Assertions.assertTrue(fields[i].getClass().isArray());
-                Assertions.assertNotNull(Array.get(fields[i], 0));
+        Config testConfig = getTestConfigFile(conf);
+        SeaTunnelSchema seaTunnelSchema = 
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA));
+        SeaTunnelRowType seaTunnelRowType = 
seaTunnelSchema.getSeaTunnelRowType();
+        FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(seaTunnelSchema, fakeConfig);
+        List<SeaTunnelRow> seaTunnelRows = 
fakeDataGenerator.generateFakedRows();
+        Assertions.assertNotNull(seaTunnelRows);
+        Assertions.assertEquals(seaTunnelRows.size(), 10);
+        for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+            for (int i = 0; i < seaTunnelRowType.getFieldTypes().length; i++) {
+                switch (seaTunnelRowType.getFieldType(i).getSqlType()) {
+                    case STRING:
+                        Assertions.assertEquals(((String) 
seaTunnelRow.getField(i)).length(), 10);
+                        break;
+                    case BYTES:
+                        Assertions.assertEquals(((byte[]) 
seaTunnelRow.getField(i)).length, 10);
+                        break;
+                    case ARRAY:
+                        Assertions.assertEquals(((Object[]) 
seaTunnelRow.getField(i)).length, 10);
+                        break;
+                    case MAP:
+                        Assertions.assertEquals(((Map<?, ?>) 
seaTunnelRow.getField(i)).size(), 10);
+                        break;
+                    default:
+                        // do nothing
+                        break;
+                }
             }
         }
     }
@@ -74,14 +76,14 @@ public class FakeRandomDataTest {
         if (!configFile.startsWith("/")) {
             configFile = "/" + configFile;
         }
-        URL resource = FakeRandomDataTest.class.getResource(configFile);
+        URL resource = FakeDataGeneratorTest.class.getResource(configFile);
         if (resource == null) {
             throw new FileNotFoundException("Can't find config file: " + 
configFile);
         }
         String path = Paths.get(resource.toURI()).toString();
         Config config = ConfigFactory.parseFile(new File(path));
-        assert config.hasPath("schema");
-        return config.getConfig("schema");
+        assert config.hasPath("FakeSource");
+        return config.getConfig("FakeSource");
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 6a06dbf06..96e82ee41 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -15,24 +15,47 @@
 # limitations under the License.
 #
 
-schema {
-  fields {
-    map = "map<string, map<string, string>>"
-    map_array = "map<string, map<string, array<int>>>"
-    array = "array<tinyint>"
-    string = string
-    boolean = boolean
-    tinyint = tinyint
-    smallint = smallint
-    int = int
-    bigint = bigint
-    float = float
-    double = double
-    decimal = "decimal(30, 8)"
-    null = "null"
-    bytes = bytes
-    date = date
-    time = time
-    timestamp = timestamp
+FakeSource {
+  row.num = 10
+  map.size = 10
+  array.size = 10
+  bytes.length = 10
+  string.length = 10
+  schema = {
+    fields {
+      c_map = "map<string, array<int>>"
+      c_array = "array<int>"
+      c_string = string
+      c_boolean = boolean
+      c_tinyint = tinyint
+      c_smallint = smallint
+      c_int = int
+      c_bigint = bigint
+      c_float = float
+      c_double = double
+      c_decimal = "decimal(30, 8)"
+      c_null = "null"
+      c_bytes = bytes
+      c_date = date
+      c_timestamp = timestamp
+      c_row = {
+        c_map = "map<string, map<string, string>>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
   }
-}
\ No newline at end of file
+  result_table_name = "fake"
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
index 6716f00cd..58848a91c 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
@@ -15,23 +15,30 @@
 # limitations under the License.
 #
 
-schema {
-  fields {
-    map = "map<string, string>"
-    array = "array<tinyint>"
-    string = string
-    boolean = boolean
-    tinyint = tinyint
-    smallint = smallint
-    int = int
-    bigint = bigint
-    float = float
-    double = double
-    decimal = "decimal(30, 8)"
-    null = "null"
-    bytes = bytes
-    date = date
-    time = time
-    timestamp = timestamp
+FakeSource {
+  row.num = 10
+  map.size = 10
+  array.size = 10
+  bytes.length = 10
+  string.length = 10
+  schema = {
+    fields {
+      c_map = "map<string, string>"
+      c_array = "array<int>"
+      c_string = string
+      c_boolean = boolean
+      c_tinyint = tinyint
+      c_smallint = smallint
+      c_int = int
+      c_bigint = bigint
+      c_float = float
+      c_double = double
+      c_decimal = "decimal(30, 8)"
+      c_null = "null"
+      c_bytes = bytes
+      c_date = date
+      c_timestamp = timestamp
+    }
   }
+  result_table_name = "fake"
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 33ed73bc5..3134cfb40 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -50,11 +50,11 @@ sink {
                    },
                    {
                        rule_type = MIN_LENGTH
-                       rule_value = 10
+                       rule_value = 5
                    },
                    {
                         rule_type = MAX_LENGTH
-                        rule_value = 10
+                        rule_value = 65535
                    }
                ]
            },{
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index b2fda476b..e2a1cb128 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-assert-flink-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -61,7 +61,7 @@ sink {
                    },
                    {
                        rule_type = MIN_LENGTH
-                       rule_value = 10
+                       rule_value = 5
                    },
                    {
                         rule_type = MAX_LENGTH


Reply via email to