This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a9bc5ca58d [Feature][API] Support hocon style declare row type in 
generic type (#6187)
a9bc5ca58d is described below

commit a9bc5ca58ddc0195806b323e2dd949b043db7d17
Author: Chengyu Yan <[email protected]>
AuthorDate: Wed Jan 17 22:55:31 2024 +0800

    [Feature][API] Support hocon style declare row type in generic type (#6187)
---
 docs/en/concept/schema-feature.md                  | 70 +++++++++++++------
 release-note.md                                    |  1 +
 .../catalog/SeaTunnelDataTypeConvertorUtil.java    | 79 +++++++++++++++-------
 .../api/table/catalog/CatalogTableUtilTest.java    | 37 ++++++++++
 .../SeaTunnelDataTypeConvertorUtilTest.java        | 22 ++++--
 .../test/resources/conf/generic_row.schema.conf    | 25 +++++++
 .../seatunnel/e2e/connector/fake/FakeIT.java       |  3 +
 .../resources/fake_generic_row_type_to_assert.conf | 66 ++++++++++++++++++
 8 files changed, 257 insertions(+), 46 deletions(-)

diff --git a/docs/en/concept/schema-feature.md 
b/docs/en/concept/schema-feature.md
index 4c95e79fef..cee8c1a5ff 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -69,25 +69,57 @@ columns = [
 
 #### What type supported at now
 
-| Data type | Value type in Java                                 | Description 
                                                                                
                                                                                
                                                                                
                                                                                
          |
-|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| string    | `java.lang.String`                                 | string      
                                                                                
                                                                                
                                                                                
                                                                                
          |
-| boolean   | `java.lang.Boolean`                                | boolean     
                                                                                
                                                                                
                                                                                
                                                                                
          |
-| tinyint   | `java.lang.Byte`                                   | -128 to 127 
regular. 0 to 255 unsigned*. Specify the maximum number of digits in 
parentheses.                                                                    
                                                                                
                                                                                
                     |
-| smallint  | `java.lang.Short`                                  | -32768 to 
32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in 
parentheses.                                                                    
                                                                                
                                                                                
               |
-| int       | `java.lang.Integer`                                | All numbers 
from -2,147,483,648 to 2,147,483,647 are allowed.                               
                                                                                
                                                                                
                                                                                
          |
-| bigint    | `java.lang.Long`                                   | All numbers 
between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed.   
                                                                                
                                                                                
                                                                                
          |
-| float     | `java.lang.Float`                                  | 
Float-precision numeric data from -1.79E+308 to 1.79E+308.                      
                                                                                
                                                                                
                                                                                
                      |
-| double    | `java.lang.Double`                                 | Double 
precision floating point. Handle most decimals.                                 
                                                                                
                                                                                
                                                                                
               |
-| decimal   | `java.math.BigDecimal`                             | DOUBLE type 
stored as a string, allowing a fixed decimal point.                             
                                                                                
                                                                                
                                                                                
          |
-| null      | `java.lang.Void`                                   | null        
                                                                                
                                                                                
                                                                                
                                                                                
          |
-| bytes     | `byte[]`                                           | bytes.      
                                                                                
                                                                                
                                                                                
                                                                                
          |
-| date      | `java.time.LocalDate`                              | Only the 
date is stored. From January 1, 0001 to December 31, 9999.                      
                                                                                
                                                                                
                                                                                
             |
-| time      | `java.time.LocalTime`                              | Only store 
time. Accuracy is 100 nanoseconds.                                              
                                                                                
                                                                                
                                                                                
           |
-| timestamp | `java.time.LocalDateTime`                          | Stores a 
unique number that is updated whenever a row is created or modified. timestamp 
is based on the internal clock and does not correspond to real time. There can 
only be one timestamp variable per table.                                       
                                                                                
               |
-| row       | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row 
type,can be nested.                                                             
                                                                                
                                                                                
                                                                                
                  |
-| map       | `java.util.Map`                                    | A Map is an 
object that maps keys to values. The key type includes `int` `string` `boolean` 
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` 
`timestamp` `null` , and the value type includes `int` `string` `boolean` 
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` 
`timestamp` `null` `array` `map`. |
-| array     | `ValueType[]`                                      | A array is 
a data type that represents a collection of elements. The element type includes 
`int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `array` 
`map`.                                                                          
                                                                                
           |
+| Data type | Value type in Java                                 | Description 
                                                                                
                                                                                
                                                                                
                                                                                
                |
+|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| string    | `java.lang.String`                                 | string      
                                                                                
                                                                                
                                                                                
                                                                                
                |
+| boolean   | `java.lang.Boolean`                                | boolean     
                                                                                
                                                                                
                                                                                
                                                                                
                |
+| tinyint   | `java.lang.Byte`                                   | -128 to 127 
regular. 0 to 255 unsigned*. Specify the maximum number of digits in 
parentheses.                                                                    
                                                                                
                                                                                
                           |
+| smallint  | `java.lang.Short`                                  | -32768 to 
32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in 
parentheses.                                                                    
                                                                                
                                                                                
                     |
+| int       | `java.lang.Integer`                                | All numbers 
from -2,147,483,648 to 2,147,483,647 are allowed.                               
                                                                                
                                                                                
                                                                                
                |
+| bigint    | `java.lang.Long`                                   | All numbers 
between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed.   
                                                                                
                                                                                
                                                                                
                |
+| float     | `java.lang.Float`                                  | 
Float-precision numeric data from -1.79E+308 to 1.79E+308.                      
                                                                                
                                                                                
                                                                                
                            |
+| double    | `java.lang.Double`                                 | Double 
precision floating point. Handle most decimals.                                 
                                                                                
                                                                                
                                                                                
                     |
+| decimal   | `java.math.BigDecimal`                             | DOUBLE type 
stored as a string, allowing a fixed decimal point.                             
                                                                                
                                                                                
                                                                                
                |
+| null      | `java.lang.Void`                                   | null        
                                                                                
                                                                                
                                                                                
                                                                                
                |
+| bytes     | `byte[]`                                           | bytes.      
                                                                                
                                                                                
                                                                                
                                                                                
                |
+| date      | `java.time.LocalDate`                              | Only the 
date is stored. From January 1, 0001 to December 31, 9999.                      
                                                                                
                                                                                
                                                                                
                   |
+| time      | `java.time.LocalTime`                              | Only store 
time. Accuracy is 100 nanoseconds.                                              
                                                                                
                                                                                
                                                                                
                 |
+| timestamp | `java.time.LocalDateTime`                          | Stores a 
unique number that is updated whenever a row is created or modified. timestamp 
is based on the internal clock and does not correspond to real time. There can 
only be one timestamp variable per table.                                       
                                                                                
                     |
+| row       | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row 
type,can be nested.                                                             
                                                                                
                                                                                
                                                                                
                        |
+| map       | `java.util.Map`                                    | A Map is an 
object that maps keys to values. The key type includes `int` `string` `boolean` 
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` 
`timestamp` `null` , and the value type includes `int` `string` `boolean` 
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` 
`timestamp` `null` `array` `map` `row`. |
+| array     | `ValueType[]`                                      | A array is 
a data type that represents a collection of elements. The element type includes 
`int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `array` 
`map` `row`.                                                                    
                                                                                
                 |
+
+#### How to declare type supported
+
+SeaTunnel provides a simple and direct way to declare basic types. The keyword 
names for basic types can be used directly as type declarations, and SeaTunnel 
is case-insensitive to type keywords. Basic type keywords include `string`, 
`boolean`, `tinyint`, `smallint`, `int`, `bigint`, `float`, `double`, `date`, 
`time`, `timestamp`, and `null`. For example, if you need to declare a field 
with integer type, you can simply define the field as `int` or `"int"`.
+
+When declaring complex types (such as **decimal**, **array**, **map**, and 
**row**), pay attention to specific considerations.
+- When declaring a decimal type, precision and scale settings are required, 
and the type definition follows the format `decimal(precision, scale)`. It's 
essential to emphasize that the declaration of the decimal type must be 
enclosed in `"`; you cannot use the type name directly, as with basic types. 
For example, when declaring a decimal field with precision 10 and scale 2, you 
specify the field type as `"decimal(10,2)"`.
+- When declaring an array type, you need to specify the element type, and the 
type definition follows the format `array<T>`, where `T` represents the element 
type and can be any type supported by SeaTunnel. Similar to the decimal type 
declaration, it also be enclosed in `"`. For example, when declaring a field 
with an array of integers, you specify the field type as `"array<int>"`.
+- When declaring a map type, you need to specify the key and value types. The 
map type definition follows the format `map<K,V>`, where `K` represents the key 
type and `V` represents the value type. `K` can be any basic type, and `V` can 
be any type supported by SeaTunnel. Similar to previous type declarations, the 
map type declaration must be enclosed in double quotes. For example, when 
declaring a field with map type, where the key type is string and the value 
type is integer, you can d [...]
+- When declaring a row type, you need to define a 
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md) object to 
describe the fields and their types. The field types can be any type supported 
by SeaTunnel. For example, when declaring a row type containing an integer 
field `a` and a string field `b`, you can declare it as `{a = int, b = 
string}`. Enclosing the definition in `"` as a string is also acceptable, so 
`"{a = int, b = string}"` is equivalent to `{a = int, c = string}` [...]
+
+Here is an example of complex type declarations:
+
+```hocon
+schema {
+  fields {
+    c_decimal = "decimal(10, 2)"
+    c_array = "array<{c_0 = int, c_1 = string}>"
+    c_row = {
+        c_int = int
+        c_string = string
+        c_row = {
+            c_int = int
+        }
+    }
+    # Hocon style declare row type in generic type
+    map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int = 
int}}>"
+    # Json style declare row type in generic type
+    map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\", 
\"c_row\":{\"c_int\":\"int\"}}>"
+  }
+}
+```
 
 ### PrimaryKey
 
diff --git a/release-note.md b/release-note.md
index 0ded29987d..831018d273 100644
--- a/release-note.md
+++ b/release-note.md
@@ -165,6 +165,7 @@
 - [Core] [Shade] Add guava shade module (#4358)
 - [Core] [Spark] Support SeaTunnel Time Type (#5188)
 - [Core] [Flink] Support Decimal Type with configurable precision and scale 
(#5419)
+- [Core] [API] Support hocon style declare row type in generic type (#6187)
 
 ### Connector-V2
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index f15980f41c..ce71a46051 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -29,9 +32,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.common.utils.JsonUtils;
-
-import java.util.Map;
 
 public class SeaTunnelDataTypeConvertorUtil {
 
@@ -87,13 +87,13 @@ public class SeaTunnelDataTypeConvertorUtil {
     private static SeaTunnelDataType<?> parseComplexDataType(String field, 
String columnStr) {
         String column = columnStr.toUpperCase().replace(" ", "");
         if (column.startsWith(SqlType.MAP.name())) {
-            return parseMapType(field, column);
+            return parseMapType(field, columnStr);
         }
         if (column.startsWith(SqlType.ARRAY.name())) {
-            return parseArrayType(field, column);
+            return parseArrayType(field, columnStr);
         }
         if (column.startsWith(SqlType.DECIMAL.name())) {
-            return parseDecimalType(column);
+            return parseDecimalType(columnStr);
         }
         if (column.trim().startsWith("{")) {
             return parseRowType(columnStr);
@@ -102,31 +102,64 @@ public class SeaTunnelDataTypeConvertorUtil {
     }
 
     private static SeaTunnelDataType<?> parseRowType(String columnStr) {
-        ObjectNode jsonNodes = JsonUtils.parseObject(columnStr);
-        Map<String, String> fieldsMap = JsonUtils.toStringMap(jsonNodes);
-        String[] fieldsName = new String[fieldsMap.size()];
-        SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[fieldsMap.size()];
-        int i = 0;
-        for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
-            fieldsName[i] = entry.getKey();
-            seaTunnelDataTypes[i] = 
deserializeSeaTunnelDataType(entry.getKey(), entry.getValue());
-            i++;
+        String confPayload = "{conf = " + columnStr + "}";
+        Config conf;
+        try {
+            conf = ConfigFactory.parseString(confPayload);
+        } catch (RuntimeException e) {
+            throw new IllegalArgumentException(
+                    String.format("HOCON Config parse from %s failed.", 
confPayload), e);
+        }
+        return parseRowType(conf.getObject("conf"));
+    }
+
+    private static SeaTunnelDataType<?> parseRowType(ConfigObject conf) {
+        String[] fieldNames = new String[conf.size()];
+        SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[conf.size()];
+        conf.keySet().toArray(fieldNames);
+
+        for (int idx = 0; idx < fieldNames.length; idx++) {
+            String fieldName = fieldNames[idx];
+            ConfigValue typeVal = conf.get(fieldName);
+            switch (typeVal.valueType()) {
+                case STRING:
+                    {
+                        fieldTypes[idx] =
+                                deserializeSeaTunnelDataType(
+                                        fieldNames[idx], (String) 
typeVal.unwrapped());
+                    }
+                    break;
+                case OBJECT:
+                    {
+                        fieldTypes[idx] = parseRowType((ConfigObject) typeVal);
+                    }
+                    break;
+                case LIST:
+                case NUMBER:
+                case BOOLEAN:
+                case NULL:
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Unsupported parse SeaTunnel Type from 
'%s'.",
+                                    typeVal.unwrapped()));
+            }
         }
-        return new SeaTunnelRowType(fieldsName, seaTunnelDataTypes);
+        return new SeaTunnelRowType(fieldNames, fieldTypes);
     }
 
     private static SeaTunnelDataType<?> parseMapType(String field, String 
columnStr) {
-        String genericType = getGenericType(columnStr);
+        String genericType = getGenericType(columnStr).trim();
         int index =
-                genericType.startsWith(SqlType.DECIMAL.name())
+                genericType.toUpperCase().startsWith(SqlType.DECIMAL.name())
                         ?
                         // if map key is decimal, we should find the index of 
second ','
                         genericType.indexOf(",", genericType.indexOf(",") + 1)
                         :
                         // if map key is not decimal, we should find the index 
of first ','
                         genericType.indexOf(",");
-        String keyGenericType = genericType.substring(0, index);
-        String valueGenericType = genericType.substring(index + 1);
+        String keyGenericType = genericType.substring(0, index).trim();
+        String valueGenericType = genericType.substring(index + 1).trim();
         return new MapType<>(
                 deserializeSeaTunnelDataType(field, keyGenericType),
                 deserializeSeaTunnelDataType(field, valueGenericType));
@@ -138,7 +171,7 @@ public class SeaTunnelDataTypeConvertorUtil {
     }
 
     private static SeaTunnelDataType<?> parseArrayType(String field, String 
columnStr) {
-        String genericType = getGenericType(columnStr);
+        String genericType = getGenericType(columnStr).trim();
         SeaTunnelDataType<?> dataType = deserializeSeaTunnelDataType(field, 
genericType);
         switch (dataType.getSqlType()) {
             case STRING:
@@ -158,7 +191,7 @@ public class SeaTunnelDataTypeConvertorUtil {
             case DOUBLE:
                 return ArrayType.DOUBLE_ARRAY_TYPE;
             default:
-                throw CommonError.unsupportedDataType("SeaTunnel", columnStr, 
field);
+                throw CommonError.unsupportedDataType("SeaTunnel", 
genericType, field);
         }
     }
 
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index d3e84205c4..9b579fbd90 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
@@ -137,6 +138,42 @@ public class CatalogTableUtilTest {
                                 
Thread.currentThread().getContextClassLoader()));
     }
 
+    @Test
+    public void testGenericRowSchemaTest() throws FileNotFoundException, 
URISyntaxException {
+        String path = getTestConfigFile("/conf/generic_row.schema.conf");
+        Config config = ConfigFactory.parseFile(new File(path));
+        SeaTunnelRowType seaTunnelRowType =
+                CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        Assertions.assertNotNull(seaTunnelRowType);
+        Assertions.assertArrayEquals(
+                new String[] {"map0", "map1"}, 
seaTunnelRowType.getFieldNames());
+
+        MapType<String, SeaTunnelRowType> mapType0 =
+                (MapType<String, SeaTunnelRowType>) 
seaTunnelRowType.getFieldType(0);
+        MapType<String, SeaTunnelRowType> mapType1 =
+                (MapType<String, SeaTunnelRowType>) 
seaTunnelRowType.getFieldType(1);
+        Assertions.assertNotNull(mapType0);
+        Assertions.assertNotNull(mapType1);
+        Assertions.assertEquals(BasicType.STRING_TYPE, mapType0.getKeyType());
+
+        SeaTunnelRowType expectedVal =
+                new SeaTunnelRowType(
+                        new String[] {"c_int", "c_string", "c_row"},
+                        new SeaTunnelDataType[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            new SeaTunnelRowType(
+                                    new String[] {"c_int"},
+                                    new SeaTunnelDataType[] 
{BasicType.INT_TYPE})
+                        });
+        SeaTunnelRowType mapType0ValType =
+                (SeaTunnelRowType) ((SeaTunnelDataType<?>) 
mapType0.getValueType());
+        Assertions.assertEquals(expectedVal, mapType0ValType);
+        SeaTunnelRowType mapType1ValType =
+                (SeaTunnelRowType) ((SeaTunnelDataType<?>) 
mapType1.getValueType());
+        Assertions.assertEquals(expectedVal, mapType1ValType);
+    }
+
     public static String getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         URL resource = CatalogTableUtilTest.class.getResource(configFile);
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
index ac1717a3ab..f126722d83 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -66,13 +66,27 @@ public class SeaTunnelDataTypeConvertorUtilTest {
                 "ErrorCode:[COMMON-07], ErrorDescription:['SeaTunnel' 
unsupported data type 'uuid' of 'test']",
                 exception4.getMessage());
 
-        RuntimeException exception5 =
+        IllegalArgumentException exception5 =
                 Assertions.assertThrows(
-                        RuntimeException.class,
+                        IllegalArgumentException.class,
                         () ->
                                 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
                                         "test", "{uuid}"));
-        Assertions.assertEquals(
-                "String json deserialization exception.{uuid}", 
exception5.getMessage());
+        String expectedMsg5 =
+                String.format("HOCON Config parse from %s failed.", "{conf = 
{uuid}}");
+        Assertions.assertEquals(expectedMsg5, exception5.getMessage());
+
+        String invalidTypeDeclaration = "[e]";
+        IllegalArgumentException exception6 =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                        "test",
+                                        String.format("{c_0 = %s}", 
invalidTypeDeclaration)));
+        String expectedMsg6 =
+                String.format(
+                        "Unsupported parse SeaTunnel Type from '%s'.", 
invalidTypeDeclaration);
+        Assertions.assertEquals(expectedMsg6, exception6.getMessage());
     }
 }
diff --git a/seatunnel-api/src/test/resources/conf/generic_row.schema.conf 
b/seatunnel-api/src/test/resources/conf/generic_row.schema.conf
new file mode 100644
index 0000000000..168db5dff9
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/generic_row.schema.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+schema {
+  fields {
+    # Hocon style declare row type in generic type
+    map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int = 
int}}>"
+    # Json style declare row type in generic type
+    map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\", 
\"c_row\":{\"c_int\":\"int\"}}>"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
index baafb6e33c..d3d187b172 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
@@ -38,5 +38,8 @@ public class FakeIT extends TestSuiteBase {
         Container.ExecResult fakeWithTemplate =
                 container.executeJob("/fake_to_assert_with_template.conf");
         Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
+        Container.ExecResult fakeComplex =
+                container.executeJob("/fake_generic_row_type_to_assert.conf");
+        Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
new file mode 100644
index 0000000000..8bdf07ef47
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = BATCH
+  # checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    row.num = 1
+    schema = {
+      fields {
+        c_0 = "map<string, {c_int=int\nc_string=string}>"
+        c_1 = "map<string, {c_int=int,c_string=string}>"
+        c_2 = "map<string, {c_int=int,c_string=string,c_row={c_int=int}}>"
+        c_3 = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink{
+  Assert {
+    source_table_name = "fake"
+    rules =
+      {
+        catalog_table_rule {
+          column_rule = [
+            {
+              name = "c_0"
+              type = "map<string, {c_int=int\nc_string=string}>"
+            }
+            {
+              name = "c_1"
+              type = "map<string, {c_int=int,c_string=string}>"
+            }
+            {
+              name = "c_2"
+              type = "map<string, 
{c_int=int,c_string=string,c_row={c_int=int}}>"
+            }
+            {
+              name = "c_3"
+              type = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
+            }
+          ]
+        }
+      }
+  }
+}
\ No newline at end of file


Reply via email to