This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a5447528c [Imporve][Fake-Connector-V2]support user-defined-schmea and 
random data for fake-table  (#2406)
a5447528c is described below

commit a5447528c34e9c0b5452ef4f175dd32d3636c0e6
Author: Laglangyue <[email protected]>
AuthorDate: Tue Aug 23 09:45:05 2022 +0800

    [Imporve][Fake-Connector-V2]support user-defined-schmea and random data for 
fake-table  (#2406)
    
    * [Connector-V2][JDBC-connector] optimization fake
    Co-authored-by: tangjiafu <[email protected]>
---
 docs/en/connector-v2/sink/Assert.md                |  12 +-
 docs/en/connector-v2/source/FakeSource.md          |  76 ++++++++++++
 .../seatunnel/common/schema/SeatunnelSchema.java   |   3 +-
 .../seatunnel/fake/source/FakeRandomData.java      | 133 +++++++++++++++++++++
 .../seatunnel/fake/source/FakeSource.java          |  12 +-
 .../seatunnel/fake/source/FakeSourceReader.java    |  16 +--
 .../FakeRandomDataTest.java                        |  87 ++++++++++++++
 .../src/test/resources/complex.schema.conf         |  56 ++++-----
 .../src/test/resources/simple.schema.conf          |  55 ++++-----
 .../resources/assertion/fakesource_to_assert.conf  |  20 ++--
 .../test/resources/fake/fakesource_to_console.conf |  18 +--
 .../resources/file/fakesource_to_hdfs_json.conf    |  11 +-
 .../resources/file/fakesource_to_hdfs_parquet.conf |  11 +-
 .../resources/file/fakesource_to_hdfs_text.conf    |  11 +-
 .../resources/file/fakesource_to_local_json.conf   |  11 +-
 .../file/fakesource_to_local_parquet.conf          |  11 +-
 .../resources/file/fakesource_to_local_text.conf   |  13 +-
 .../test/resources/iotdb/fakesource_to_iotdb.conf  |   9 +-
 .../test/resources/jdbc/fakesource_to_jdbc.conf    |  12 +-
 .../test/resources/jdbc/jdbcsource_to_console.conf |   4 +-
 .../test/resources/fake/fakesource_to_console.conf |  10 +-
 .../resources/file/fakesource_to_hdfs_json.conf    |  13 +-
 .../resources/file/fakesource_to_hdfs_parquet.conf |  13 +-
 .../resources/file/fakesource_to_hdfs_text.conf    |  13 +-
 .../resources/file/fakesource_to_local_json.conf   |  11 +-
 .../file/fakesource_to_local_parquet.conf          |  11 +-
 .../resources/file/fakesource_to_local_text.conf   |  11 +-
 .../test/resources/iotdb/fakesource_to_iotdb.conf  |   7 +-
 .../main/resources/examples/fake_to_console.conf   |  13 +-
 .../main/resources/examples/fake_to_dingtalk.conf  |  13 +-
 .../resources/examples/fakesource_to_file.conf     |  13 +-
 .../resources/examples/spark.batch.clickhouse.conf |   9 +-
 .../src/main/resources/examples/spark.batch.conf   |  29 ++++-
 .../translation/serialization/RowConverter.java    |   4 +-
 34 files changed, 570 insertions(+), 181 deletions(-)

diff --git a/docs/en/connector-v2/sink/Assert.md 
b/docs/en/connector-v2/sink/Assert.md
index ca9332d0c..9e5c49acf 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -37,13 +37,11 @@ A list value rule define the data value validation
 ### rule_type [string]
 
 The following rules are supported for now
-`
-NOT_NULL,   // value can't be null
-MIN,        // define the minimum value of data
-MAX,        // define the maximum value of data
-MIN_LENGTH, // define the minimum string length of a string data
-MAX_LENGTH  // define the maximum string length of a string data
-`
+- NOT_NULL `value can't be null`
+- MIN `define the minimum value of data`
+- MAX `define the maximum value of data`
+- MIN_LENGTH `define the minimum string length of a string data`
+- MAX_LENGTH `define the maximum string length of a string data`
 
 ### rule_value [double]
 
diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
new file mode 100644
index 000000000..9c4bf4ffd
--- /dev/null
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -0,0 +1,76 @@
+# FakeSource
+
+> FakeSource connector
+
+## Description
+
+The FakeSource is a virtual data source, which randomly generates the number 
of rows according to the data structure of the user-defined schema,
+just for testing, such as type conversion and feature testing
+
+## Options
+
+| name              | type   | required | default value |
+|-------------------|--------|----------|---------------|
+| result_table_name | string | yes      | -             |
+| schema            | config | yes      | -             |
+
+### result_table_name [string]
+
+The table name.
+
+### type [string]
+Table structure description ,you should assign schema option to tell connector 
how to parse data to the row you want.  
+**Tips**: Most of Unstructured-Datasource contain this param, such as 
LocalFile,HdfsFile.  
+**Example**:
+```hocon
+schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+      }
+    }
+```
+
+## Example
+Simple source for FakeSource which contains enough datatype
+```hocon
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+```
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 4c4799b21..835d1d0ca 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -32,9 +32,10 @@ import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
+import java.io.Serializable;
 import java.util.Map;
 
-public class SeatunnelSchema {
+public class SeatunnelSchema implements Serializable {
     public static final String SCHEMA = "schema";
     private static final String FIELD_KEY = "fields";
     private static final String SIMPLE_SCHEMA_FILED = "content";
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
new file mode 100644
index 000000000..efafde5ca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class FakeRandomData {
+    public static final String SCHEMA = "schema";
+    private final SeatunnelSchema schema;
+
+    public FakeRandomData(SeatunnelSchema schema) {
+        this.schema = schema;
+    }
+
+    public SeaTunnelRow randomRow() {
+        SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        List<Object> randomRow = new ArrayList<>(fieldNames.length);
+        for (SeaTunnelDataType<?> fieldType : fieldTypes) {
+            randomRow.add(randomColumnValue(fieldType));
+        }
+        return new SeaTunnelRow(randomRow.toArray());
+    }
+
+    @SuppressWarnings("magicnumber")
+    private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+        if (BOOLEAN_TYPE.equals(fieldType)) {
+            return RandomUtils.nextInt(0, 2) == 1;
+        } else if (BYTE_TYPE.equals(fieldType)) {
+            return (byte) RandomUtils.nextInt(0, 255);
+        } else if (SHORT_TYPE.equals(fieldType)) {
+            return (short) RandomUtils.nextInt(Byte.MAX_VALUE, 
Short.MAX_VALUE);
+        } else if (INT_TYPE.equals(fieldType)) {
+            return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
+        } else if (LONG_TYPE.equals(fieldType)) {
+            return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
+        } else if (FLOAT_TYPE.equals(fieldType)) {
+            return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
+        } else if (DOUBLE_TYPE.equals(fieldType)) {
+            return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
+        } else if (STRING_TYPE.equals(fieldType)) {
+            return RandomStringUtils.randomAlphabetic(10);
+        } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
+            return randomLocalDateTime().toLocalDate();
+        } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
+            return randomLocalDateTime().toLocalTime();
+        } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
+            return randomLocalDateTime();
+        } else if (fieldType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) fieldType;
+            return new 
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - 
decimalType.getScale()) + "." +
+                RandomStringUtils.randomNumeric(decimalType.getScale()));
+        } else if (fieldType instanceof ArrayType) {
+            ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+            BasicType<?> elementType = arrayType.getElementType();
+            Object value = randomColumnValue(elementType);
+            Object arr = Array.newInstance(elementType.getTypeClass(), 1);
+            Array.set(arr, 0, value);
+            return arr;
+        } else if (fieldType instanceof MapType) {
+            MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
+            SeaTunnelDataType<?> keyType = mapType.getKeyType();
+            Object key = randomColumnValue(keyType);
+            SeaTunnelDataType<?> valueType = mapType.getValueType();
+            Object value = randomColumnValue(valueType);
+            HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
+            objectObjectHashMap.put(key, value);
+            return objectObjectHashMap;
+        } else if (fieldType instanceof PrimitiveByteArrayType) {
+            return RandomUtils.nextBytes(100);
+        } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
+            return Void.TYPE;
+        } else {
+            throw new UnsupportedOperationException("Unexpected value: " + 
fieldType);
+        }
+    }
+
+    @SuppressWarnings("magicnumber")
+    private LocalDateTime randomLocalDateTime() {
+        return LocalDateTime.of(
+            LocalDateTime.now().getYear(),
+            RandomUtils.nextInt(1, 12),
+            RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()),
+            RandomUtils.nextInt(0, 24),
+            RandomUtils.nextInt(0, 59)
+        );
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 65e6587f4..4e7b1317e 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -20,11 +20,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.fake.source;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
@@ -38,6 +37,7 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     private Config pluginConfig;
     private SeaTunnelContext seaTunnelContext;
+    private SeatunnelSchema schema;
 
     @Override
     public Boundedness getBoundedness() {
@@ -46,14 +46,12 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public SeaTunnelRowType getProducedType() {
-        return new SeaTunnelRowType(
-            new String[]{"name", "age", "timestamp"},
-            new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE, 
BasicType.INT_TYPE, BasicType.LONG_TYPE});
+        return schema.getSeaTunnelRowType();
     }
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new FakeSourceReader(readerContext);
+        return new FakeSourceReader(readerContext, new FakeRandomData(schema));
     }
 
     @Override
@@ -64,6 +62,8 @@ public class FakeSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     @Override
     public void prepare(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
+        assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
+        this.schema = 
SeatunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 7007eaf25..d67e1f4b2 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -26,20 +26,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
 public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FakeSourceReader.class);
 
     private final SingleSplitReaderContext context;
 
-    private final String[] names = {"Wenjun", "Fanjia", "Zongwen", 
"CalvinKirs"};
-    private final int[] ages = {11, 22, 33, 44};
+    private final FakeRandomData fakeRandomData;
 
-    public FakeSourceReader(SingleSplitReaderContext context) {
+    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData 
randomData) {
         this.context = context;
+        this.fakeRandomData = randomData;
     }
 
     @Override
@@ -56,11 +53,8 @@ public class FakeSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
         // Generate a random number of rows to emit.
-        Random random = ThreadLocalRandom.current();
-        int size = random.nextInt(10) + 1;
-        for (int i = 0; i < size; i++) {
-            int randomIndex = random.nextInt(names.length);
-            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new 
Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()});
+        for (int i = 0; i < 10; i++) {
+            SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
             output.collect(seaTunnelRow);
         }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
new file mode 100644
index 000000000..67906a161
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class FakeRandomDataTest {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
+    public void testComplexSchemaParse(String conf) throws 
FileNotFoundException, URISyntaxException {
+        Config testConfigFile = getTestConfigFile(conf);
+        SeatunnelSchema seatunnelSchema = 
SeatunnelSchema.buildWithConfig(testConfigFile);
+        FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema);
+        SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
+        Assertions.assertNotNull(seaTunnelRow);
+        Object[] fields = seaTunnelRow.getFields();
+        Assertions.assertNotNull(fields);
+        SeaTunnelRowType seaTunnelRowType = 
seatunnelSchema.getSeaTunnelRowType();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        for (int i = 0; i < fieldTypes.length; i++) {
+            if (fieldTypes[i].getSqlType() != SqlType.NULL) {
+                Assertions.assertNotNull(fields[i]);
+            } else {
+                Assertions.assertSame(fields[i], Void.TYPE);
+            }
+            if (fieldTypes[i].getSqlType() == SqlType.MAP) {
+                Assertions.assertTrue(fields[i] instanceof Map);
+                Map<?, ?> field = (Map) fields[i];
+                field.forEach((k, v) -> Assertions.assertTrue(k != null && v 
!= null));
+            }
+            if (fieldTypes[i].getSqlType() == SqlType.ARRAY) {
+                Assertions.assertTrue(fields[i].getClass().isArray());
+                Assertions.assertNotNull(Array.get(fields[i], 0));
+            }
+        }
+    }
+
+    private Config getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
+        if (!configFile.startsWith("/")) {
+            configFile = "/" + configFile;
+        }
+        URL resource = FakeRandomDataTest.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + 
configFile);
+        }
+        String path = Paths.get(resource.toURI()).toString();
+        Config config = ConfigFactory.parseFile(new File(path));
+        assert config.hasPath("schema");
+        return config.getConfig("schema");
+    }
+
+}
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
 b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
similarity index 53%
copy from 
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
copy to 
seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 2e27410ee..6a06dbf06 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -6,7 +6,7 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,38 +15,24 @@
 # limitations under the License.
 #
 
-env {
-  spark.app.name = "SeaTunnelToClickHouseV2"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    field_name = "name,age"
-  }
-}
-
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-    result_table_name = "sql"
-  }
-}
-
-sink {
-  ClickHouse {
-    host = "139.198.158.103:8123"
-    database = "default"
-    table = "test_clickhouse_table_v2"
-    fields = ["name", "age"]
-    username = 'default'
-    bulk_size = 20000
-    retry_codes = [209, 210]
-    retry = 3
+schema {
+  fields {
+    map = "map<string, map<string, string>>"
+    map_array = "map<string, map<string, array<int>>>"
+    array = "array<tinyint>"
+    string = string
+    boolean = boolean
+    tinyint = tinyint
+    smallint = smallint
+    int = int
+    bigint = bigint
+    float = float
+    double = double
+    decimal = "decimal(30, 8)"
+    null = "null"
+    bytes = bytes
+    date = date
+    time = time
+    timestamp = timestamp
   }
-}
+}
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
 b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
similarity index 53%
copy from 
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
copy to 
seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
index 2e27410ee..6716f00cd 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
@@ -6,7 +6,7 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,38 +15,23 @@
 # limitations under the License.
 #
 
-env {
-  spark.app.name = "SeaTunnelToClickHouseV2"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    field_name = "name,age"
-  }
-}
-
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-    result_table_name = "sql"
-  }
-}
-
-sink {
-  ClickHouse {
-    host = "139.198.158.103:8123"
-    database = "default"
-    table = "test_clickhouse_table_v2"
-    fields = ["name", "age"]
-    username = 'default'
-    bulk_size = 20000
-    retry_codes = [209, 210]
-    retry = 3
+schema {
+  fields {
+    map = "map<string, string>"
+    array = "array<tinyint>"
+    string = string
+    boolean = boolean
+    tinyint = tinyint
+    smallint = smallint
+    int = int
+    bigint = bigint
+    float = float
+    double = double
+    decimal = "decimal(30, 8)"
+    null = "null"
+    bytes = bytes
+    date = date
+    time = time
+    timestamp = timestamp
   }
-}
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index d1e9c4583..b2fda476b 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -29,9 +29,13 @@ source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
-    }
-
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+  }
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
 }
@@ -57,11 +61,11 @@ sink {
                    },
                    {
                        rule_type = MIN_LENGTH
-                       rule_value = 3
+                       rule_value = 10
                    },
                    {
                         rule_type = MAX_LENGTH
-                        rule_value = 20
+                        rule_value = 10
                    }
                ]
            },{
@@ -73,16 +77,16 @@ sink {
                    },
                    {
                        rule_type = MIN
-                       rule_value = 1
+                       rule_value = 32767
                    },
                    {
                         rule_type = MAX
-                        rule_value = 100
+                        rule_value = 2147483647
                    }
                ]
            }
            ]
    }
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
index 49be0920f..d9eb0ff86 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -28,13 +28,17 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-    FakeSource {
-      result_table_name = "fake"
-      field_name = "name,age"
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
     }
-
+  }
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -43,12 +47,12 @@ transform {
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/transform/sql
 }
 
 sink {
   Console {}
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
index 769c8760d..269b85d08 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -61,5 +66,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
index 9f5fd0b17..5e1ea5c01 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -62,5 +67,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
index ef83dfd4e..d4a8a745c 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -62,5 +67,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
index b18b472f6..c118c33c8 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -61,5 +66,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
index 9e2d5ad96..c02c16145 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -62,5 +67,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
index d162b101f..7d1a5d42c 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
@@ -29,11 +29,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -42,7 +47,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/transform/sql
 }
 
 sink {
@@ -62,5 +67,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
index da1ae4936..4c5e0fe4e 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -29,13 +29,18 @@ env {
 source {
     FakeSource {
         result_table_name = "fake"
-        field_name = "name, age"
+        schema = {
+            fields {
+                name = "string"
+                age = "int"
+            }
+        }
     }
-
     # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
     # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
+
 transform {
     sql {
         sql = "select * from (values('root.ln.d1', '1660147200000', 
'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 
'false,1002')) t (device, `timestamp`, measurements, `values`)"
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 9640e19c2..0f732b63b 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -30,11 +30,15 @@ source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name"
+      schema = {
+        fields {
+          name = "string"
+        }
+      }
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -43,7 +47,7 @@ transform {
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/transform/sql
 }
 
 sink {
@@ -57,5 +61,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 6862abc04..4d65c569e 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -43,11 +43,11 @@ source {
 transform {
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/transform/sql
 }
 
 sink {
   Console {}
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
index 7b8127620..f469338a5 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -33,6 +33,12 @@ source {
   # This is a example input plugin **only for test and demonstrate the feature 
input plugin**
   FakeSource {
     result_table_name = "my_dataset"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # You can also use other input plugins, such as hdfs
@@ -42,8 +48,8 @@ source {
   #   format = "json"
   # }
 
-  # If you would like to get more information about how to configure seatunnel 
and see full list of input plugins,
-  # please go to 
https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
index c4d1aabe5..40454bce0 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -60,5 +65,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
index bdae80d74..550990eea 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -61,5 +66,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
index b682d3831..2bf6afba6 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -61,5 +66,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
index d257f81bb..f8dab0cbc 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
index b5d141212..edbad7ec6 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
index 733a48e61..41ff5e8f6 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
@@ -28,11 +28,16 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
@@ -41,7 +46,7 @@ transform {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
index 9c7e521b7..503668487 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -28,7 +28,12 @@ env {
 source {
     FakeSource {
         result_table_name = "fake"
-        field_name = "name, age"
+        schema = {
+            fields {
+                name = "string"
+                age = "int"
+            }
+        }
     }
 
     # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index feb394127..6a89b64a6 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -30,11 +30,16 @@ source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 transform {
@@ -43,12 +48,12 @@ transform {
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
   Console {}
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
index aed134290..d681985d0 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
@@ -30,11 +30,16 @@ source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 transform {
@@ -43,7 +48,7 @@ transform {
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -53,5 +58,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
index f7b790c40..d640281cc 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
@@ -30,11 +30,16 @@ source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
     }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 transform {
@@ -43,7 +48,7 @@ transform {
       sql = "select name,age from fake"
     }
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -64,5 +69,5 @@ sink {
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
index 2e27410ee..8d57e1f3b 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -26,8 +26,15 @@ env {
 source {
   FakeSource {
     result_table_name = "fake"
-    field_name = "name,age"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
   }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of input plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 45ab040b4..6e7755d8c 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -33,8 +33,27 @@ env {
 source {
   # This is a example input plugin **only for test and demonstrate the feature 
input plugin**
   FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+      }
+    }
     result_table_name = "fake"
-    field_name = "name,age,timestamp"
   }
 
   # You can also use other input plugins, such as hdfs
@@ -45,7 +64,7 @@ source {
   # }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of input plugins,
-  # please go to 
https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 transform {
@@ -53,12 +72,12 @@ transform {
 
   # you can also use other transform plugins, such as sql
   sql {
-    sql = "select name,age from fake"
+    sql = "select 
c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes
 from fake"
     result_table_name = "sql"
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
+  # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
@@ -72,5 +91,5 @@ sink {
   # }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of output plugins,
-  # please go to 
https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
index 4b6495e7f..f642cfe45 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
@@ -57,12 +57,12 @@ public abstract class RowConverter<T> {
             }
         }
         if (errors.size() > 0) {
-            throw new UnsupportedOperationException("");
+            throw new UnsupportedOperationException(String.join(",", errors));
         }
     }
 
     protected boolean validate(Object field, SeaTunnelDataType<?> dataType) {
-        if (field == null) {
+        if (field == null || dataType.getSqlType() == SqlType.NULL) {
             return true;
         }
         SqlType sqlType = dataType.getSqlType();

Reply via email to