This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 39162a4e0b [Improve][Connector-V2] Fake supports column configuration 
(#7503)
39162a4e0b is described below

commit 39162a4e0bb9f64309b6a7824829719f74f7c8ea
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 28 15:18:59 2024 +0800

    [Improve][Connector-V2] Fake supports column configuration (#7503)
    
    * [Improve][Connector-V2] Fake supports column configuration
    
    * [Improve][Connector-V2] optimized code
---
 .../table/catalog/schema/ReadonlyConfigParser.java |  3 +-
 .../seatunnel/fake/source/FakeDataGenerator.java   | 72 +++++++++-------
 .../seatunnel/fake/utils/FakeDataRandomUtils.java  | 93 +++++++++++++--------
 .../fake/source/FakeDataGeneratorTest.java         | 55 ++++++++++++
 .../src/test/resources/complex.schema.conf         |  2 +-
 .../src/test/resources/fake-data.column.conf       | 97 ++++++++++++++++++++++
 6 files changed, 253 insertions(+), 69 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index e043c0ecd7..ab85455b34 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -95,7 +95,8 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
                 String value = entry.getValue();
                 SeaTunnelDataType<?> dataType =
                         
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
-                PhysicalColumn column = PhysicalColumn.of(key, dataType, 0, 
true, null, null);
+                PhysicalColumn column =
+                        PhysicalColumn.of(key, dataType, null, null, true, 
null, null);
                 columns.add(column);
             }
             return columns;
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 9ac392b6a7..524d231063 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,9 +34,11 @@ import 
org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.function.Function;
 
 public class FakeDataGenerator {
     private final CatalogTable catalogTable;
@@ -71,12 +73,11 @@ public class FakeDataGenerator {
     }
 
     private SeaTunnelRow randomRow() {
-        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
-        String[] fieldNames = rowType.getFieldNames();
-        SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
-        List<Object> randomRow = new ArrayList<>(fieldNames.length);
-        for (SeaTunnelDataType<?> fieldType : fieldTypes) {
-            randomRow.add(randomColumnValue(fieldType));
+        // Generate random data according to the data type and data colum of 
the table
+        List<Column> physicalColumns = 
catalogTable.getTableSchema().getColumns();
+        List<Object> randomRow = new ArrayList<>(physicalColumns.size());
+        for (Column column : physicalColumns) {
+            randomRow.add(randomColumnValue(column));
         }
         SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray());
         seaTunnelRow.setTableId(tableId);
@@ -103,7 +104,8 @@ public class FakeDataGenerator {
     }
 
     @SuppressWarnings("magicnumber")
-    private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+    private Object randomColumnValue(Column column) {
+        SeaTunnelDataType<?> fieldType = column.getDataType();
         switch (fieldType.getSqlType()) {
             case ARRAY:
                 ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
@@ -111,7 +113,7 @@ public class FakeDataGenerator {
                 int length = fakeConfig.getArraySize();
                 Object array = Array.newInstance(elementType.getTypeClass(), 
length);
                 for (int i = 0; i < length; i++) {
-                    Object value = randomColumnValue(elementType);
+                    Object value = randomColumnValue(column.copy(elementType));
                     Array.set(array, i, value);
                 }
                 return array;
@@ -122,59 +124,57 @@ public class FakeDataGenerator {
                 HashMap<Object, Object> objectMap = new HashMap<>();
                 int mapSize = fakeConfig.getMapSize();
                 for (int i = 0; i < mapSize; i++) {
-                    Object key = randomColumnValue(keyType);
-                    Object value = randomColumnValue(valueType);
+                    Object key = randomColumnValue(column.copy(keyType));
+                    Object value = randomColumnValue(column.copy(valueType));
                     objectMap.put(key, value);
                 }
                 return objectMap;
             case STRING:
-                return fakeDataRandomUtils.randomString();
+                return value(column, String::toString, 
fakeDataRandomUtils::randomString);
             case BOOLEAN:
-                return fakeDataRandomUtils.randomBoolean();
+                return value(column, Boolean::parseBoolean, 
fakeDataRandomUtils::randomBoolean);
             case TINYINT:
-                return fakeDataRandomUtils.randomTinyint();
+                return value(column, Byte::parseByte, 
fakeDataRandomUtils::randomTinyint);
             case SMALLINT:
-                return fakeDataRandomUtils.randomSmallint();
+                return value(column, Short::parseShort, 
fakeDataRandomUtils::randomSmallint);
             case INT:
-                return fakeDataRandomUtils.randomInt();
+                return value(column, Integer::parseInt, 
fakeDataRandomUtils::randomInt);
             case BIGINT:
-                return fakeDataRandomUtils.randomBigint();
+                return value(column, Long::parseLong, 
fakeDataRandomUtils::randomBigint);
             case FLOAT:
-                return fakeDataRandomUtils.randomFloat();
+                return value(column, Float::parseFloat, 
fakeDataRandomUtils::randomFloat);
             case DOUBLE:
-                return fakeDataRandomUtils.randomDouble();
+                return value(column, Double::parseDouble, 
fakeDataRandomUtils::randomDouble);
             case DECIMAL:
-                DecimalType decimalType = (DecimalType) fieldType;
-                return fakeDataRandomUtils.randomBigDecimal(
-                        decimalType.getPrecision(), decimalType.getScale());
+                return value(column, BigDecimal::new, 
fakeDataRandomUtils::randomBigDecimal);
             case NULL:
                 return null;
             case BYTES:
-                return fakeDataRandomUtils.randomBytes();
+                return value(column, String::getBytes, 
fakeDataRandomUtils::randomBytes);
             case DATE:
-                return fakeDataRandomUtils.randomLocalDate();
+                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalDate);
             case TIME:
-                return fakeDataRandomUtils.randomLocalTime();
+                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalTime);
             case TIMESTAMP:
-                return fakeDataRandomUtils.randomLocalDateTime();
+                return value(column, String::toString, 
fakeDataRandomUtils::randomLocalDateTime);
             case ROW:
                 SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) 
fieldType).getFieldTypes();
                 Object[] objects = new Object[fieldTypes.length];
                 for (int i = 0; i < fieldTypes.length; i++) {
-                    Object object = randomColumnValue(fieldTypes[i]);
+                    Object object = 
randomColumnValue(column.copy(fieldTypes[i]));
                     objects[i] = object;
                 }
                 return new SeaTunnelRow(objects);
             case BINARY_VECTOR:
-                return fakeDataRandomUtils.randomBinaryVector();
+                return fakeDataRandomUtils.randomBinaryVector(column);
             case FLOAT_VECTOR:
-                return fakeDataRandomUtils.randomFloatVector();
+                return fakeDataRandomUtils.randomFloatVector(column);
             case FLOAT16_VECTOR:
-                return fakeDataRandomUtils.randomFloat16Vector();
+                return fakeDataRandomUtils.randomFloat16Vector(column);
             case BFLOAT16_VECTOR:
-                return fakeDataRandomUtils.randomBFloat16Vector();
+                return fakeDataRandomUtils.randomBFloat16Vector(column);
             case SPARSE_FLOAT_VECTOR:
-                return fakeDataRandomUtils.randomSparseFloatVector();
+                return fakeDataRandomUtils.randomSparseFloatVector(column);
             default:
                 // never got in there
                 throw new FakeConnectorException(
@@ -182,4 +182,12 @@ public class FakeDataGenerator {
                         "SeaTunnel Fake source connector not support this data 
type");
         }
     }
+
+    private static <T> T value(
+            Column column, Function<String, T> convert, Function<Column, T> 
generate) {
+        if (column.getDefaultValue() != null) {
+            return convert.apply(column.getDefaultValue().toString());
+        }
+        return generate.apply(column);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index 8a8a14dc70..c4a038ff1a 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.utils;
 
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.common.utils.BufferUtils;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
@@ -25,6 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -45,30 +48,34 @@ public class FakeDataRandomUtils {
         return list.get(index);
     }
 
-    public Boolean randomBoolean() {
+    public Boolean randomBoolean(Column column) {
         return RandomUtils.nextInt(0, 2) == 1;
     }
 
-    public BigDecimal randomBigDecimal(int precision, int scale) {
+    public BigDecimal randomBigDecimal(Column column) {
+        DecimalType dataType = (DecimalType) column.getDataType();
         return new BigDecimal(
-                RandomStringUtils.randomNumeric(precision - scale)
+                RandomStringUtils.randomNumeric(dataType.getPrecision() - 
dataType.getScale())
                         + "."
-                        + RandomStringUtils.randomNumeric(scale));
+                        + 
RandomStringUtils.randomNumeric(dataType.getScale()));
     }
 
-    public byte[] randomBytes() {
+    public byte[] randomBytes(Column column) {
         return 
RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes();
     }
 
-    public String randomString() {
+    public String randomString(Column column) {
         List<String> stringTemplate = fakeConfig.getStringTemplate();
         if (!CollectionUtils.isEmpty(stringTemplate)) {
             return randomFromList(stringTemplate);
         }
-        return 
RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
+        return RandomStringUtils.randomAlphabetic(
+                column.getColumnLength() != null
+                        ? column.getColumnLength().intValue()
+                        : fakeConfig.getStringLength());
     }
 
-    public Byte randomTinyint() {
+    public Byte randomTinyint(Column column) {
         List<Integer> tinyintTemplate = fakeConfig.getTinyintTemplate();
         if (!CollectionUtils.isEmpty(tinyintTemplate)) {
             return randomFromList(tinyintTemplate).byteValue();
@@ -76,7 +83,7 @@ public class FakeDataRandomUtils {
         return (byte) RandomUtils.nextInt(fakeConfig.getTinyintMin(), 
fakeConfig.getTinyintMax());
     }
 
-    public Short randomSmallint() {
+    public Short randomSmallint(Column column) {
         List<Integer> smallintTemplate = fakeConfig.getSmallintTemplate();
         if (!CollectionUtils.isEmpty(smallintTemplate)) {
             return randomFromList(smallintTemplate).shortValue();
@@ -85,7 +92,7 @@ public class FakeDataRandomUtils {
                 RandomUtils.nextInt(fakeConfig.getSmallintMin(), 
fakeConfig.getSmallintMax());
     }
 
-    public Integer randomInt() {
+    public Integer randomInt(Column column) {
         List<Integer> intTemplate = fakeConfig.getIntTemplate();
         if (!CollectionUtils.isEmpty(intTemplate)) {
             return randomFromList(intTemplate);
@@ -93,7 +100,7 @@ public class FakeDataRandomUtils {
         return RandomUtils.nextInt(fakeConfig.getIntMin(), 
fakeConfig.getIntMax());
     }
 
-    public Long randomBigint() {
+    public Long randomBigint(Column column) {
         List<Long> bigTemplate = fakeConfig.getBigTemplate();
         if (!CollectionUtils.isEmpty(bigTemplate)) {
             return randomFromList(bigTemplate);
@@ -101,32 +108,39 @@ public class FakeDataRandomUtils {
         return RandomUtils.nextLong(fakeConfig.getBigintMin(), 
fakeConfig.getBigintMax());
     }
 
-    public Float randomFloat() {
+    public Float randomFloat(Column column) {
         List<Double> floatTemplate = fakeConfig.getFloatTemplate();
         if (!CollectionUtils.isEmpty(floatTemplate)) {
             return randomFromList(floatTemplate).floatValue();
         }
-        return RandomUtils.nextFloat(
-                (float) fakeConfig.getFloatMin(), (float) 
fakeConfig.getFloatMax());
+        float v =
+                RandomUtils.nextFloat(
+                        (float) fakeConfig.getFloatMin(), (float) 
fakeConfig.getFloatMax());
+        return column.getScale() == null
+                ? v
+                : new BigDecimal(v).setScale(column.getScale(), 
RoundingMode.HALF_UP).floatValue();
     }
 
-    public Double randomDouble() {
+    public Double randomDouble(Column column) {
         List<Double> doubleTemplate = fakeConfig.getDoubleTemplate();
         if (!CollectionUtils.isEmpty(doubleTemplate)) {
             return randomFromList(doubleTemplate);
         }
-        return RandomUtils.nextDouble(fakeConfig.getDoubleMin(), 
fakeConfig.getDoubleMax());
+        double v = RandomUtils.nextDouble(fakeConfig.getDoubleMin(), 
fakeConfig.getDoubleMax());
+        return column.getScale() == null
+                ? v
+                : new BigDecimal(v).setScale(column.getScale(), 
RoundingMode.HALF_UP).floatValue();
     }
 
-    public LocalDate randomLocalDate() {
-        return randomLocalDateTime().toLocalDate();
+    public LocalDate randomLocalDate(Column column) {
+        return randomLocalDateTime(column).toLocalDate();
     }
 
-    public LocalTime randomLocalTime() {
-        return randomLocalDateTime().toLocalTime();
+    public LocalTime randomLocalTime(Column column) {
+        return randomLocalDateTime(column).toLocalTime();
     }
 
-    public LocalDateTime randomLocalDateTime() {
+    public LocalDateTime randomLocalDateTime(Column column) {
         int year;
         int month;
         int day;
@@ -172,15 +186,20 @@ public class FakeDataRandomUtils {
         return LocalDateTime.of(year, month, day, hour, minute, second);
     }
 
-    public ByteBuffer randomBinaryVector() {
-        int byteCount = fakeConfig.getBinaryVectorDimension() / 8;
+    public ByteBuffer randomBinaryVector(Column column) {
+        int byteCount =
+                (column.getScale() != null)
+                        ? column.getScale() / 8
+                        : fakeConfig.getBinaryVectorDimension() / 8;
         // binary vector doesn't care endian since each byte is independent
         return ByteBuffer.wrap(RandomUtils.nextBytes(byteCount));
     }
 
-    public ByteBuffer randomFloatVector() {
-        Float[] floatVector = new Float[fakeConfig.getVectorDimension()];
-        for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+    public ByteBuffer randomFloatVector(Column column) {
+        int count =
+                (column.getScale() != null) ? column.getScale() : 
fakeConfig.getVectorDimension();
+        Float[] floatVector = new Float[count];
+        for (int i = 0; i < count; i++) {
             floatVector[i] =
                     RandomUtils.nextFloat(
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
@@ -188,9 +207,11 @@ public class FakeDataRandomUtils {
         return BufferUtils.toByteBuffer(floatVector);
     }
 
-    public ByteBuffer randomFloat16Vector() {
-        Short[] float16Vector = new Short[fakeConfig.getVectorDimension()];
-        for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+    public ByteBuffer randomFloat16Vector(Column column) {
+        int count =
+                (column.getScale() != null) ? column.getScale() : 
fakeConfig.getVectorDimension();
+        Short[] float16Vector = new Short[count];
+        for (int i = 0; i < count; i++) {
             float value =
                     RandomUtils.nextFloat(
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
@@ -199,9 +220,11 @@ public class FakeDataRandomUtils {
         return BufferUtils.toByteBuffer(float16Vector);
     }
 
-    public ByteBuffer randomBFloat16Vector() {
-        Short[] bfloat16Vector = new Short[fakeConfig.getVectorDimension()];
-        for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
+    public ByteBuffer randomBFloat16Vector(Column column) {
+        int count =
+                (column.getScale() != null) ? column.getScale() : 
fakeConfig.getVectorDimension();
+        Short[] bfloat16Vector = new Short[count];
+        for (int i = 0; i < count; i++) {
             float value =
                     RandomUtils.nextFloat(
                             fakeConfig.getVectorFloatMin(), 
fakeConfig.getVectorFloatMax());
@@ -210,10 +233,10 @@ public class FakeDataRandomUtils {
         return BufferUtils.toByteBuffer(bfloat16Vector);
     }
 
-    public Map<Integer, Float> randomSparseFloatVector() {
+    public Map<Integer, Float> randomSparseFloatVector(Column column) {
         Map<Integer, Float> sparseVector = new HashMap<>();
-
-        Integer nonZeroElements = fakeConfig.getVectorDimension();
+        int nonZeroElements =
+                (column.getScale() != null) ? column.getScale() : 
fakeConfig.getVectorDimension();
         while (nonZeroElements > 0) {
             Integer index = RandomUtils.nextInt();
             Float value =
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index c1cd826cb0..e33883f554 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -38,6 +39,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
@@ -141,6 +143,59 @@ public class FakeDataGeneratorTest {
         Assertions.assertNotNull(seaTunnelRows);
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"fake-data.column.conf"})
+    public void testColumnDataParse(String conf) throws FileNotFoundException, 
URISyntaxException {
+        ReadonlyConfig testConfig = getTestConfigFile(conf);
+        FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        List<SeaTunnelRow> seaTunnelRows =
+                fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+        seaTunnelRows.forEach(
+                seaTunnelRow -> {
+                    Assertions.assertEquals(
+                            seaTunnelRow.getField(0).toString(), "Andersen's 
Fairy Tales");
+                    
Assertions.assertEquals(seaTunnelRow.getField(1).toString().length(), 100);
+                    
Assertions.assertEquals(seaTunnelRow.getField(2).toString(), "10.1");
+                    
Assertions.assertNotNull(seaTunnelRow.getField(3).toString());
+                    
Assertions.assertNotNull(seaTunnelRow.getField(4).toString());
+                    //  VectorType.VECTOR_FLOAT_TYPE
+                    Assertions.assertEquals(
+                            8, ((ByteBuffer) 
seaTunnelRow.getField(5)).capacity() / 4);
+                    // VectorType.VECTOR_BINARY_TYPE
+                    Assertions.assertEquals(
+                            16, ((ByteBuffer) 
seaTunnelRow.getField(6)).capacity() * 8);
+                    // VectorType.VECTOR_FLOAT16_TYPE
+                    Assertions.assertEquals(
+                            8, ((ByteBuffer) 
seaTunnelRow.getField(7)).capacity() / 2);
+                    // VectorType.VECTOR_BFLOAT16_TYPE
+                    Assertions.assertEquals(
+                            8, ((ByteBuffer) 
seaTunnelRow.getField(8)).capacity() / 2);
+                    // VectorType.VECTOR_SPARSE_FLOAT_TYPE
+                    Assertions.assertEquals(8, ((Map) 
seaTunnelRow.getField(9)).size());
+                    Assertions.assertEquals(
+                            268,
+                            seaTunnelRow.getBytesSize(
+                                    new SeaTunnelRowType(
+                                            new String[] {
+                                                "field1", "field2", "field3", 
"field4", "field5",
+                                                "field6", "field7", "field8", 
"field9", "field10"
+                                            },
+                                            new SeaTunnelDataType<?>[] {
+                                                BasicType.STRING_TYPE,
+                                                BasicType.STRING_TYPE,
+                                                BasicType.FLOAT_TYPE,
+                                                BasicType.FLOAT_TYPE,
+                                                BasicType.DOUBLE_TYPE,
+                                                VectorType.VECTOR_FLOAT_TYPE,
+                                                VectorType.VECTOR_BINARY_TYPE,
+                                                VectorType.VECTOR_FLOAT16_TYPE,
+                                                
VectorType.VECTOR_BFLOAT16_TYPE,
+                                                
VectorType.VECTOR_SPARSE_FLOAT_TYPE
+                                            })));
+                });
+    }
+
     private ReadonlyConfig getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         if (!configFile.startsWith("/")) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 96e82ee41c..e3f0d7ee26 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -23,7 +23,7 @@ FakeSource {
   string.length = 10
   schema = {
     fields {
-      c_map = "map<string, array<int>>"
+      c_map = "map<string, map<string, string>>"
       c_array = "array<int>"
       c_string = string
       c_boolean = boolean
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
new file mode 100644
index 0000000000..9a1515264e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.column.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ FakeSource {
+      row.num = 5
+      vector.float.max=1
+      vector.float.min=0
+      float.max = 2
+      float.min = 0
+      double.max = 4
+      double.min = 2
+
+      # low weight
+      string.length = 4
+      vector.dimension= 4
+      binary.vector.dimension=8
+      # end
+
+      schema = {
+           columns = [
+           {
+              name = book_name
+              type = string
+              defaultValue = "Andersen's Fairy Tales"
+              comment = "book name"
+           },
+           {
+              name  = book_reader_testimonials
+              type = string
+              columnLength = 100
+              comment = "book reader testimonials"
+           },
+           {
+              name = book_price
+              type = float
+              defaultValue = 10.1
+              comment = "book price"
+           },
+           {
+              name = book_percentage_popularity
+              type = float
+              columnScale = 4
+              comment = "book percentage popularity"
+           },
+           {
+              name = book_distribution_law
+              type = double
+              columnScale = 2
+              comment = "book distribution law"
+           },
+           {
+              name = book_intro_1
+              type = float_vector
+              columnScale =8
+              comment = "vector"
+           },
+           {
+              name = book_intro_2
+              type = binary_vector
+              columnScale = 16
+              comment = "vector"
+           },
+           {
+              name = book_intro_3
+              type = float16_vector
+              columnScale =8
+              comment = "vector"
+           },
+           {
+              name = book_intro_4
+              type = bfloat16_vector
+              columnScale =8
+              comment = "vector"
+           },
+           {
+              name = book_intro_5
+              type = sparse_float_vector
+              columnScale =8
+              comment = "vector"
+           }
+       ]
+      }
+  }
\ No newline at end of file

Reply via email to