This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 39162a4e0b [Improve][Connector-V2] Fake supports column configuration
(#7503)
39162a4e0b is described below
commit 39162a4e0bb9f64309b6a7824829719f74f7c8ea
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 28 15:18:59 2024 +0800
[Improve][Connector-V2] Fake supports column configuration (#7503)
* [Improve][Connector-V2] Fake supports column configuration
* [Improve][Connector-V2] optimized code
---
.../table/catalog/schema/ReadonlyConfigParser.java | 3 +-
.../seatunnel/fake/source/FakeDataGenerator.java | 72 +++++++++-------
.../seatunnel/fake/utils/FakeDataRandomUtils.java | 93 +++++++++++++--------
.../fake/source/FakeDataGeneratorTest.java | 55 ++++++++++++
.../src/test/resources/complex.schema.conf | 2 +-
.../src/test/resources/fake-data.column.conf | 97 ++++++++++++++++++++++
6 files changed, 253 insertions(+), 69 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index e043c0ecd7..ab85455b34 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -95,7 +95,8 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
String value = entry.getValue();
SeaTunnelDataType<?> dataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
- PhysicalColumn column = PhysicalColumn.of(key, dataType, 0,
true, null, null);
+ PhysicalColumn column =
+ PhysicalColumn.of(key, dataType, null, null, true,
null, null);
columns.add(column);
}
return columns;
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 9ac392b6a7..524d231063 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,9 +34,11 @@ import
org.apache.seatunnel.format.json.JsonDeserializationSchema;
import java.io.IOException;
import java.lang.reflect.Array;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.function.Function;
public class FakeDataGenerator {
private final CatalogTable catalogTable;
@@ -71,12 +73,11 @@ public class FakeDataGenerator {
}
private SeaTunnelRow randomRow() {
- SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
- String[] fieldNames = rowType.getFieldNames();
- SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
- List<Object> randomRow = new ArrayList<>(fieldNames.length);
- for (SeaTunnelDataType<?> fieldType : fieldTypes) {
- randomRow.add(randomColumnValue(fieldType));
+ // Generate random data according to the data type and data colum of
the table
+ List<Column> physicalColumns =
catalogTable.getTableSchema().getColumns();
+ List<Object> randomRow = new ArrayList<>(physicalColumns.size());
+ for (Column column : physicalColumns) {
+ randomRow.add(randomColumnValue(column));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray());
seaTunnelRow.setTableId(tableId);
@@ -103,7 +104,8 @@ public class FakeDataGenerator {
}
@SuppressWarnings("magicnumber")
- private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+ private Object randomColumnValue(Column column) {
+ SeaTunnelDataType<?> fieldType = column.getDataType();
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
@@ -111,7 +113,7 @@ public class FakeDataGenerator {
int length = fakeConfig.getArraySize();
Object array = Array.newInstance(elementType.getTypeClass(),
length);
for (int i = 0; i < length; i++) {
- Object value = randomColumnValue(elementType);
+ Object value = randomColumnValue(column.copy(elementType));
Array.set(array, i, value);
}
return array;
@@ -122,59 +124,57 @@ public class FakeDataGenerator {
HashMap<Object, Object> objectMap = new HashMap<>();
int mapSize = fakeConfig.getMapSize();
for (int i = 0; i < mapSize; i++) {
- Object key = randomColumnValue(keyType);
- Object value = randomColumnValue(valueType);
+ Object key = randomColumnValue(column.copy(keyType));
+ Object value = randomColumnValue(column.copy(valueType));
objectMap.put(key, value);
}
return objectMap;
case STRING:
- return fakeDataRandomUtils.randomString();
+ return value(column, String::toString,
fakeDataRandomUtils::randomString);
case BOOLEAN:
- return fakeDataRandomUtils.randomBoolean();
+ return value(column, Boolean::parseBoolean,
fakeDataRandomUtils::randomBoolean);
case TINYINT:
- return fakeDataRandomUtils.randomTinyint();
+ return value(column, Byte::parseByte,
fakeDataRandomUtils::randomTinyint);
case SMALLINT:
- return fakeDataRandomUtils.randomSmallint();
+ return value(column, Short::parseShort,
fakeDataRandomUtils::randomSmallint);
case INT:
- return fakeDataRandomUtils.randomInt();
+ return value(column, Integer::parseInt,
fakeDataRandomUtils::randomInt);
case BIGINT:
- return fakeDataRandomUtils.randomBigint();
+ return value(column, Long::parseLong,
fakeDataRandomUtils::randomBigint);
case FLOAT:
- return fakeDataRandomUtils.randomFloat();
+ return value(column, Float::parseFloat,
fakeDataRandomUtils::randomFloat);
case DOUBLE:
- return fakeDataRandomUtils.randomDouble();
+ return value(column, Double::parseDouble,
fakeDataRandomUtils::randomDouble);
case DECIMAL:
- DecimalType decimalType = (DecimalType) fieldType;
- return fakeDataRandomUtils.randomBigDecimal(
- decimalType.getPrecision(), decimalType.getScale());
+ return value(column, BigDecimal::new,
fakeDataRandomUtils::randomBigDecimal);
case NULL:
return null;
case BYTES:
- return fakeDataRandomUtils.randomBytes();
+ return value(column, String::getBytes,
fakeDataRandomUtils::randomBytes);
case DATE:
- return fakeDataRandomUtils.randomLocalDate();
+ return value(column, String::toString,
fakeDataRandomUtils::randomLocalDate);
case TIME:
- return fakeDataRandomUtils.randomLocalTime();
+ return value(column, String::toString,
fakeDataRandomUtils::randomLocalTime);
case TIMESTAMP:
- return fakeDataRandomUtils.randomLocalDateTime();
+ return value(column, String::toString,
fakeDataRandomUtils::randomLocalDateTime);
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType)
fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
- Object object = randomColumnValue(fieldTypes[i]);
+ Object object =
randomColumnValue(column.copy(fieldTypes[i]));
objects[i] = object;
}
return new SeaTunnelRow(objects);
case BINARY_VECTOR:
- return fakeDataRandomUtils.randomBinaryVector();
+ return fakeDataRandomUtils.randomBinaryVector(column);
case FLOAT_VECTOR:
- return fakeDataRandomUtils.randomFloatVector();
+ return fakeDataRandomUtils.randomFloatVector(column);
case FLOAT16_VECTOR:
- return fakeDataRandomUtils.randomFloat16Vector();
+ return fakeDataRandomUtils.randomFloat16Vector(column);
case BFLOAT16_VECTOR:
- return fakeDataRandomUtils.randomBFloat16Vector();
+ return fakeDataRandomUtils.randomBFloat16Vector(column);
case SPARSE_FLOAT_VECTOR:
- return fakeDataRandomUtils.randomSparseFloatVector();
+ return fakeDataRandomUtils.randomSparseFloatVector(column);
default:
// never got in there
throw new FakeConnectorException(
@@ -182,4 +182,12 @@ public class FakeDataGenerator {
"SeaTunnel Fake source connector not support this data
type");
}
}
+
+ private static <T> T value(
+ Column column, Function<String, T> convert, Function<Column, T>
generate) {
+ if (column.getDefaultValue() != null) {
+ return convert.apply(column.getDefaultValue().toString());
+ }
+ return generate.apply(column);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index 8a8a14dc70..c4a038ff1a 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
@@ -25,6 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -45,30 +48,34 @@ public class FakeDataRandomUtils {
return list.get(index);
}
- public Boolean randomBoolean() {
+ public Boolean randomBoolean(Column column) {
return RandomUtils.nextInt(0, 2) == 1;
}
- public BigDecimal randomBigDecimal(int precision, int scale) {
+ public BigDecimal randomBigDecimal(Column column) {
+ DecimalType dataType = (DecimalType) column.getDataType();
return new BigDecimal(
- RandomStringUtils.randomNumeric(precision - scale)
+ RandomStringUtils.randomNumeric(dataType.getPrecision() -
dataType.getScale())
+ "."
- + RandomStringUtils.randomNumeric(scale));
+ +
RandomStringUtils.randomNumeric(dataType.getScale()));
}
- public byte[] randomBytes() {
+ public byte[] randomBytes(Column column) {
return
RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes();
}
- public String randomString() {
+ public String randomString(Column column) {
List<String> stringTemplate = fakeConfig.getStringTemplate();
if (!CollectionUtils.isEmpty(stringTemplate)) {
return randomFromList(stringTemplate);
}
- return
RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
+ return RandomStringUtils.randomAlphabetic(
+ column.getColumnLength() != null
+ ? column.getColumnLength().intValue()
+ : fakeConfig.getStringLength());
}
- public Byte randomTinyint() {
+ public Byte randomTinyint(Column column) {
List<Integer> tinyintTemplate = fakeConfig.getTinyintTemplate();
if (!CollectionUtils.isEmpty(tinyintTemplate)) {
return randomFromList(tinyintTemplate).byteValue();
@@ -76,7 +83,7 @@ public class FakeDataRandomUtils {
return (byte) RandomUtils.nextInt(fakeConfig.getTinyintMin(),
fakeConfig.getTinyintMax());
}
- public Short randomSmallint() {
+ public Short randomSmallint(Column column) {
List<Integer> smallintTemplate = fakeConfig.getSmallintTemplate();
if (!CollectionUtils.isEmpty(smallintTemplate)) {
return randomFromList(smallintTemplate).shortValue();
@@ -85,7 +92,7 @@ public class FakeDataRandomUtils {
RandomUtils.nextInt(fakeConfig.getSmallintMin(),
fakeConfig.getSmallintMax());
}
- public Integer randomInt() {
+ public Integer randomInt(Column column) {
List<Integer> intTemplate = fakeConfig.getIntTemplate();
if (!CollectionUtils.isEmpty(intTemplate)) {
return randomFromList(intTemplate);
@@ -93,7 +100,7 @@ public class FakeDataRandomUtils {
return RandomUtils.nextInt(fakeConfig.getIntMin(),
fakeConfig.getIntMax());
}
- public Long randomBigint() {
+ public Long randomBigint(Column column) {
List<Long> bigTemplate = fakeConfig.getBigTemplate();
if (!CollectionUtils.isEmpty(bigTemplate)) {
return randomFromList(bigTemplate);
@@ -101,32 +108,39 @@ public class FakeDataRandomUtils {
return RandomUtils.nextLong(fakeConfig.getBigintMin(),
fakeConfig.getBigintMax());
}
- public Float randomFloat() {
+ public Float randomFloat(Column column) {
List<Double> floatTemplate = fakeConfig.getFloatTemplate();
if (!CollectionUtils.isEmpty(floatTemplate)) {
return randomFromList(floatTemplate).floatValue();
}
- return RandomUtils.nextFloat(
- (float) fakeConfig.getFloatMin(), (float)
fakeConfig.getFloatMax());
+ float v =
+ RandomUtils.nextFloat(
+ (float) fakeConfig.getFloatMin(), (float)
fakeConfig.getFloatMax());
+ return column.getScale() == null
+ ? v
+ : new BigDecimal(v).setScale(column.getScale(),
RoundingMode.HALF_UP).floatValue();
}
- public Double randomDouble() {
+ public Double randomDouble(Column column) {
List<Double> doubleTemplate = fakeConfig.getDoubleTemplate();
if (!CollectionUtils.isEmpty(doubleTemplate)) {
return randomFromList(doubleTemplate);
}
- return RandomUtils.nextDouble(fakeConfig.getDoubleMin(),
fakeConfig.getDoubleMax());
+ double v = RandomUtils.nextDouble(fakeConfig.getDoubleMin(),
fakeConfig.getDoubleMax());
+ return column.getScale() == null
+ ? v
+ : new BigDecimal(v).setScale(column.getScale(),
RoundingMode.HALF_UP).floatValue();
}
- public LocalDate randomLocalDate() {
- return randomLocalDateTime().toLocalDate();
+ public LocalDate randomLocalDate(Column column) {
+ return randomLocalDateTime(column).toLocalDate();
}
- public LocalTime randomLocalTime() {
- return randomLocalDateTime().toLocalTime();
+ public LocalTime randomLocalTime(Column column) {
+ return randomLocalDateTime(column).toLocalTime();
}
- public LocalDateTime randomLocalDateTime() {
+ public LocalDateTime randomLocalDateTime(Column column) {
int year;
int month;
int day;
@@ -172,15 +186,20 @@ public class FakeDataRandomUtils {
return LocalDateTime.of(year, month, day, hour, minute, second);
}
- public ByteBuffer randomBinaryVector() {
- int byteCount = fakeConfig.getBinaryVectorDimension() / 8;
+ public ByteBuffer randomBinaryVector(Column column) {
+ int byteCount =
+ (column.getScale() != null)
+ ? column.getScale() / 8
+ : fakeConfig.getBinaryVectorDimension() / 8;
// binary vector doesn't care endian since each byte is independent
return ByteBuffer.wrap(RandomUtils.nextBytes(byteCount));
}
- public ByteBuffer randomFloatVector() {
- Float[] floatVector = new Float[fakeConfig.getVectorDimension()];
- for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+ public ByteBuffer randomFloatVector(Column column) {
+ int count =
+ (column.getScale() != null) ? column.getScale() :
fakeConfig.getVectorDimension();
+ Float[] floatVector = new Float[count];
+ for (int i = 0; i < count; i++) {
floatVector[i] =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
@@ -188,9 +207,11 @@ public class FakeDataRandomUtils {
return BufferUtils.toByteBuffer(floatVector);
}
- public ByteBuffer randomFloat16Vector() {
- Short[] float16Vector = new Short[fakeConfig.getVectorDimension()];
- for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+ public ByteBuffer randomFloat16Vector(Column column) {
+ int count =
+ (column.getScale() != null) ? column.getScale() :
fakeConfig.getVectorDimension();
+ Short[] float16Vector = new Short[count];
+ for (int i = 0; i < count; i++) {
float value =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
@@ -199,9 +220,11 @@ public class FakeDataRandomUtils {
return BufferUtils.toByteBuffer(float16Vector);
}
- public ByteBuffer randomBFloat16Vector() {
- Short[] bfloat16Vector = new Short[fakeConfig.getVectorDimension()];
- for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+ public ByteBuffer randomBFloat16Vector(Column column) {
+ int count =
+ (column.getScale() != null) ? column.getScale() :
fakeConfig.getVectorDimension();
+ Short[] bfloat16Vector = new Short[count];
+ for (int i = 0; i < count; i++) {
float value =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(),
fakeConfig.getVectorFloatMax());
@@ -210,10 +233,10 @@ public class FakeDataRandomUtils {
return BufferUtils.toByteBuffer(bfloat16Vector);
}
- public Map<Integer, Float> randomSparseFloatVector() {
+ public Map<Integer, Float> randomSparseFloatVector(Column column) {
Map<Integer, Float> sparseVector = new HashMap<>();
-
- Integer nonZeroElements = fakeConfig.getVectorDimension();
+ int nonZeroElements =
+ (column.getScale() != null) ? column.getScale() :
fakeConfig.getVectorDimension();
while (nonZeroElements > 0) {
Integer index = RandomUtils.nextInt();
Float value =
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index c1cd826cb0..e33883f554 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -38,6 +39,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
@@ -141,6 +143,59 @@ public class FakeDataGeneratorTest {
Assertions.assertNotNull(seaTunnelRows);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"fake-data.column.conf"})
+ public void testColumnDataParse(String conf) throws FileNotFoundException,
URISyntaxException {
+ ReadonlyConfig testConfig = getTestConfigFile(conf);
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ List<SeaTunnelRow> seaTunnelRows =
+ fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+ seaTunnelRows.forEach(
+ seaTunnelRow -> {
+ Assertions.assertEquals(
+ seaTunnelRow.getField(0).toString(), "Andersen's
Fairy Tales");
+
Assertions.assertEquals(seaTunnelRow.getField(1).toString().length(), 100);
+
Assertions.assertEquals(seaTunnelRow.getField(2).toString(), "10.1");
+
Assertions.assertNotNull(seaTunnelRow.getField(3).toString());
+
Assertions.assertNotNull(seaTunnelRow.getField(4).toString());
+ // VectorType.VECTOR_FLOAT_TYPE
+ Assertions.assertEquals(
+ 8, ((ByteBuffer)
seaTunnelRow.getField(5)).capacity() / 4);
+ // VectorType.VECTOR_BINARY_TYPE
+ Assertions.assertEquals(
+ 16, ((ByteBuffer)
seaTunnelRow.getField(6)).capacity() * 8);
+ // VectorType.VECTOR_FLOAT16_TYPE
+ Assertions.assertEquals(
+ 8, ((ByteBuffer)
seaTunnelRow.getField(7)).capacity() / 2);
+ // VectorType.VECTOR_BFLOAT16_TYPE
+ Assertions.assertEquals(
+ 8, ((ByteBuffer)
seaTunnelRow.getField(8)).capacity() / 2);
+ // VectorType.VECTOR_SPARSE_FLOAT_TYPE
+ Assertions.assertEquals(8, ((Map)
seaTunnelRow.getField(9)).size());
+ Assertions.assertEquals(
+ 268,
+ seaTunnelRow.getBytesSize(
+ new SeaTunnelRowType(
+ new String[] {
+ "field1", "field2", "field3",
"field4", "field5",
+ "field6", "field7", "field8",
"field9", "field10"
+ },
+ new SeaTunnelDataType<?>[] {
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ VectorType.VECTOR_FLOAT_TYPE,
+ VectorType.VECTOR_BINARY_TYPE,
+ VectorType.VECTOR_FLOAT16_TYPE,
+
VectorType.VECTOR_BFLOAT16_TYPE,
+
VectorType.VECTOR_SPARSE_FLOAT_TYPE
+ })));
+ });
+ }
+
private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 96e82ee41c..e3f0d7ee26 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -23,7 +23,7 @@ FakeSource {
string.length = 10
schema = {
fields {
- c_map = "map<string, array<int>>"
+ c_map = "map<string, map<string, string>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
new file mode 100644
index 0000000000..9a1515264e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ FakeSource {
+ row.num = 5
+ vector.float.max=1
+ vector.float.min=0
+ float.max = 2
+ float.min = 0
+ double.max = 4
+ double.min = 2
+
+ # low weight
+ string.length = 4
+ vector.dimension= 4
+ binary.vector.dimension=8
+ # end
+
+ schema = {
+ columns = [
+ {
+ name = book_name
+ type = string
+ defaultValue = "Andersen's Fairy Tales"
+ comment = "book name"
+ },
+ {
+ name = book_reader_testimonials
+ type = string
+ columnLength = 100
+ comment = "book reader testimonials"
+ },
+ {
+ name = book_price
+ type = float
+ defaultValue = 10.1
+ comment = "book price"
+ },
+ {
+ name = book_percentage_popularity
+ type = float
+ columnScale = 4
+ comment = "book percentage popularity"
+ },
+ {
+ name = book_distribution_law
+ type = double
+ columnScale = 2
+ comment = "book distribution law"
+ },
+ {
+ name = book_intro_1
+ type = float_vector
+ columnScale =8
+ comment = "vector"
+ },
+ {
+ name = book_intro_2
+ type = binary_vector
+ columnScale = 16
+ comment = "vector"
+ },
+ {
+ name = book_intro_3
+ type = float16_vector
+ columnScale =8
+ comment = "vector"
+ },
+ {
+ name = book_intro_4
+ type = bfloat16_vector
+ columnScale =8
+ comment = "vector"
+ },
+ {
+ name = book_intro_5
+ type = sparse_float_vector
+ columnScale =8
+ comment = "vector"
+ }
+ ]
+ }
+ }
\ No newline at end of file