This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a389f69d62 [Improve] Move get schema logic from Config to 
ReadonlyConfig (#5534)
a389f69d62 is described below

commit a389f69d628af1fca97c1c78ddbef97e3b52265a
Author: Jia Fan <[email protected]>
AuthorDate: Tue Sep 26 15:42:25 2023 +0800

    [Improve] Move get schema logic from Config to ReadonlyConfig (#5534)
---
 .../api/configuration/ReadonlyConfig.java          |  18 +++-
 .../api/configuration/util/ConfigUtil.java         | 109 +++++++++++++++++--
 .../api/table/catalog/CatalogTableUtil.java        |  42 +++-----
 .../api/configuration/ReadableConfigTest.java      |   3 +-
 .../api/configuration/util/ConfigUtilTest.java     | 119 ++++++++++++++++++---
 .../api/table/catalog/CatalogTableUtilTest.java    |  24 ++++-
 .../test/resources/conf/config_special_schema.conf |  34 ++++++
 .../config_with_key_with_different_type_value.conf |  36 +++++++
 .../conf/option-test-json-after-treemap.json       | 103 ++++++++++++++++++
 .../src/test/resources/conf/option-test.conf       |  17 +++
 10 files changed, 451 insertions(+), 54 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index 64ff5a05d3..808109ea5c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -27,7 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -64,7 +64,11 @@ public class ReadonlyConfig implements Serializable {
     }
 
     public <T> T get(Option<T> option) {
-        return getOptional(option).orElseGet(option::defaultValue);
+        return get(option, true);
+    }
+
+    public <T> T get(Option<T> option, boolean flatten) {
+        return getOptional(option, flatten).orElseGet(option::defaultValue);
     }
 
     public Map<String, String> toMap() {
@@ -72,7 +76,7 @@ public class ReadonlyConfig implements Serializable {
             return Collections.emptyMap();
         }
 
-        Map<String, String> result = new HashMap<>();
+        Map<String, String> result = new LinkedHashMap<>();
         toMap(result);
         return result;
     }
@@ -87,8 +91,12 @@ public class ReadonlyConfig implements Serializable {
         }
     }
 
-    @SuppressWarnings("unchecked")
     public <T> Optional<T> getOptional(Option<T> option) {
+        return getOptional(option, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
         if (option == null) {
             throw new NullPointerException("Option not be null.");
         }
@@ -108,7 +116,7 @@ public class ReadonlyConfig implements Serializable {
         if (value == null) {
             return Optional.empty();
         }
-        return Optional.of(convertValue(value, option));
+        return Optional.of(convertValue(value, option, flatten));
     }
 
     private Object getValue(String key) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
index c9bf4655e7..a172cddfc0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -26,17 +26,23 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.configuration.Option;
 
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import lombok.extern.slf4j.Slf4j;
 
 import java.lang.reflect.ParameterizedType;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
+
 @Slf4j
 public class ConfigUtil {
     private static final JavaPropsMapper PROPERTIES_MAPPER = new 
JavaPropsMapper();
@@ -51,17 +57,106 @@ public class ConfigUtil {
      * poll.interval = 500
      * </pre>
      */
-    public static Map<String, Object> treeMap(Object rawMap) {
-        // TODO: Keeping the order of the values in the map
+    public static Map<String, Object> treeMap(Map<String, Object> rawMap) {
         try {
-            return PROPERTIES_MAPPER.readValue(
-                    PROPERTIES_MAPPER.writeValueAsString(rawMap),
-                    new TypeReference<Map<String, Object>>() {});
+            Map<List<String>, String> properties =
+                    
Arrays.stream(PROPERTIES_MAPPER.writeValueAsString(rawMap).split("\n"))
+                            .filter(StringUtils::isNoneEmpty)
+                            .map(line -> line.split("=", 2))
+                            .collect(
+                                    Collectors.toMap(
+                                            kv -> 
Arrays.asList(kv[0].split("\\.")),
+                                            kv -> kv[1],
+                                            (o, n) -> o,
+                                            LinkedHashMap::new));
+            Map<String, Object> result = loadPropertiesStyleMap(properties);
+            // Special case, we shouldn't change key in schema config.
+            // TODO we should not hard code it, it should be as a config.
+            if (rawMap.containsKey(SCHEMA.key())) {
+                result.put(SCHEMA.key(), rawMap.get(SCHEMA.key()));
+            }
+            return result;
         } catch (JsonProcessingException e) {
             throw new IllegalArgumentException("Json parsing exception.");
         }
     }
 
+    private static Map<String, Object> loadPropertiesStyleMap(
+            Map<List<String>, String> properties) {
+        Map<String, Object> propertiesMap = new LinkedHashMap<>();
+        Map<List<String>, String> temp = new LinkedHashMap<>();
+        String tempPrefix = null;
+        for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
+            String key = entry.getKey().get(0);
+            if (!key.equals(tempPrefix)) {
+                putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
+                tempPrefix = key;
+            }
+            if (entry.getKey().size() > 1) {
+                temp.put(entry.getKey().subList(1, entry.getKey().size()), 
entry.getValue());
+            } else if (!temp.isEmpty()) {
+                temp.put(Collections.singletonList(""), entry.getValue());
+            } else {
+                temp.put(null, entry.getValue());
+            }
+        }
+        putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
+        return propertiesMap;
+    }
+
+    private static void putKeyValueToMapCheck(
+            Map<String, Object> propertiesMap, Map<List<String>, String> temp, 
String tempPrefix) {
+        if (!temp.isEmpty()) {
+            if (propertiesMap.containsKey(tempPrefix)) {
+                if (temp.containsKey(null)) {
+                    ((Map) propertiesMap.get(tempPrefix)).put("", 
temp.get(null));
+                } else if (propertiesMap.get(tempPrefix) instanceof String) {
+                    loadPropertiesStyleMap(temp).put("", 
propertiesMap.get(tempPrefix));
+                } else {
+                    ((Map) 
propertiesMap.get(tempPrefix)).putAll(loadPropertiesStyleMap(temp));
+                }
+            } else {
+                propertiesMap.put(tempPrefix, loadPropertiesStyleObject(temp));
+            }
+            temp.clear();
+        }
+    }
+
+    private static List<Object> loadPropertiesStyleList(Map<List<String>, 
String> properties) {
+        List<Object> propertiesList = new ArrayList<>();
+        Map<List<String>, String> temp = new LinkedHashMap<>();
+        int tempIndex = -1;
+        for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
+            int index = Integer.parseInt(entry.getKey().get(0));
+            if (index != tempIndex) {
+                if (!temp.isEmpty()) {
+                    propertiesList.add(loadPropertiesStyleObject(temp));
+                    temp.clear();
+                }
+                tempIndex = index;
+            }
+            if (entry.getKey().size() == 1) {
+                temp.put(null, entry.getValue());
+            } else {
+                temp.put(entry.getKey().subList(1, entry.getKey().size()), 
entry.getValue());
+            }
+        }
+        if (!temp.isEmpty()) {
+            propertiesList.add(loadPropertiesStyleObject(temp));
+        }
+        return propertiesList;
+    }
+
+    private static Object loadPropertiesStyleObject(Map<List<String>, String> 
properties) {
+        if (properties.containsKey(null)) {
+            return StringEscapeUtils.unescapeJava(properties.get(null));
+        } else if (properties.entrySet().stream().anyMatch(kv -> 
kv.getKey().get(0).equals("1"))) {
+            return loadPropertiesStyleList(properties);
+        } else {
+            return loadPropertiesStyleMap(properties);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     static Object flatteningMap(
             Object rawValue, Map<String, Object> newMap, List<String> keys, 
boolean nestedMap) {
@@ -118,9 +213,9 @@ public class ConfigUtil {
     }
 
     @SuppressWarnings("unchecked")
-    public static <T> T convertValue(Object rawValue, Option<T> option) {
+    public static <T> T convertValue(Object rawValue, Option<T> option, 
boolean flatten) {
         TypeReference<T> typeReference = option.typeReference();
-        rawValue = flatteningMapWithObject(rawValue);
+        rawValue = flatten ? flatteningMapWithObject(rawValue) : rawValue;
         if (typeReference.getType() instanceof Class) {
             // simple type
             Class<T> clazz = (Class<T>) typeReference.getType();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 8439222b53..a9b921ce5b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.JsonNodeType;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
@@ -37,8 +37,6 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 
@@ -58,12 +56,11 @@ public class CatalogTableUtil implements Serializable {
     public static final Option<Map<String, String>> SCHEMA =
             
Options.key("schema").mapType().noDefaultValue().withDescription("SeaTunnel 
Schema");
 
-    public static final Option<String> FIELDS =
+    public static final Option<Map<String, Object>> FIELDS =
             Options.key("schema.fields")
-                    .stringType()
+                    .type(new TypeReference<Map<String, Object>>() {})
                     .noDefaultValue()
                     .withDescription("SeaTunnel Schema Fields");
-    private static final String FIELD_KEY = "fields";
 
     private static final SeaTunnelRowType SIMPLE_SCHEMA =
             new SeaTunnelRowType(
@@ -140,8 +137,7 @@ public class CatalogTableUtil implements Serializable {
      */
     @Deprecated
     public static List<CatalogTable> getCatalogTablesFromConfig(
-            Config config, ClassLoader classLoader) {
-        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+            ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
 
         // We use plugin_name as factoryId, so MySQL-CDC should be MySQL
         String factoryId = 
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
@@ -151,7 +147,7 @@ public class CatalogTableUtil implements Serializable {
             if (schemaMap.isEmpty()) {
                 throw new SeaTunnelException("Schema config can not be empty");
             }
-            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(config);
+            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
             return Collections.singletonList(catalogTable);
         }
 
@@ -188,12 +184,16 @@ public class CatalogTableUtil implements Serializable {
     }
 
     public static CatalogTable buildWithConfig(Config config) {
-        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, 
"schema");
-        if (!checkResult.isSuccess()) {
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+        return buildWithConfig(readonlyConfig);
+    }
+
+    public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
+        if (readonlyConfig.get(SCHEMA) == null) {
             throw new RuntimeException(
                     "Schema config need option [schema], please correct your 
config first");
         }
-        TableSchema tableSchema = parseTableSchema(config.getConfig("schema"));
+        TableSchema tableSchema = parseTableSchema(readonlyConfig);
         return CatalogTable.of(
                 // TODO: other table info
                 TableIdentifier.of("", "", ""),
@@ -265,7 +265,7 @@ public class CatalogTableUtil implements Serializable {
     }
 
     private static SeaTunnelDataType<?> parseRowType(String columnStr) {
-        Map<String, String> fieldsMap = convertJsonToMap(columnStr);
+        Map<String, String> fieldsMap = 
convertJsonToMap(JsonUtils.parseObject(columnStr));
         String[] fieldsName = new String[fieldsMap.size()];
         SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[fieldsMap.size()];
         int i = 0;
@@ -335,16 +335,7 @@ public class CatalogTableUtil implements Serializable {
         return new DecimalType(precision, scale);
     }
 
-    private static Map<String, String> convertConfigToMap(Config config) {
-        // Because the entrySet in typesafe config couldn't keep key-value 
order
-        // So use jackson parsing schema information into a map to keep 
key-value order
-        ConfigRenderOptions options = ConfigRenderOptions.concise();
-        String schema = config.root().render(options);
-        return convertJsonToMap(schema);
-    }
-
-    private static Map<String, String> convertJsonToMap(String json) {
-        ObjectNode jsonNodes = JsonUtils.parseObject(json);
+    private static Map<String, String> convertJsonToMap(ObjectNode jsonNodes) {
         LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
         jsonNodes
                 .fields()
@@ -361,8 +352,9 @@ public class CatalogTableUtil implements Serializable {
         return fieldsMap;
     }
 
-    private static TableSchema parseTableSchema(Config config) {
-        Map<String, String> fieldsMap = 
convertConfigToMap(config.getConfig(FIELD_KEY));
+    private static TableSchema parseTableSchema(ReadonlyConfig config) {
+        Map<String, String> fieldsMap =
+                convertJsonToMap((ObjectNode) 
JsonUtils.toJsonNode(config.get(FIELDS, false)));
         int fieldsNum = fieldsMap.size();
         List<Column> columns = new ArrayList<>(fieldsNum);
         for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
index ffaae72d0f..9e731efab4 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
@@ -236,9 +236,10 @@ public class ReadableConfigTest {
                 .values()
                 .forEach(
                         value -> {
-                            Assertions.assertEquals(1, value.size());
                             Assertions.assertEquals(map, value.get(0));
                         });
+        Assertions.assertEquals(complexType.get(0).get("inner.list").size(), 
2);
+        Assertions.assertEquals(complexType.get(0).get("inner.list-2").size(), 
1);
     }
 
     @Test
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
index 2e8ebe58d0..a826842412 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
@@ -17,33 +17,126 @@
 
 package org.apache.seatunnel.api.configuration.util;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.logging.log4j.core.util.IOUtils;
 
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
 
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 public class ConfigUtilTest {
 
-    @Test
-    public void convertToJsonString() throws URISyntaxException {
-        Config config =
+    private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
+
+    private static Config config;
+
+    private static Config differentValueConfig;
+
+    @BeforeAll
+    public static void init() throws URISyntaxException {
+        config =
+                ConfigFactory.parseFile(
+                        Paths.get(
+                                        ConfigUtilTest.class
+                                                
.getResource("/conf/option-test.conf")
+                                                .toURI())
+                                .toFile());
+
+        differentValueConfig =
                 ConfigFactory.parseFile(
-                                Paths.get(
-                                                ConfigUtilTest.class
-                                                        
.getResource("/conf/option-test.conf")
-                                                        .toURI())
-                                        .toFile())
-                        
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                        .resolveWith(
-                                ConfigFactory.systemProperties(),
-                                
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+                        Paths.get(
+                                        ConfigUtilTest.class
+                                                .getResource(
+                                                        
"/conf/config_with_key_with_different_type_value.conf")
+                                                .toURI())
+                                .toFile());
+    }
+
+    @Test
+    public void convertToJsonString() {
         String configJson = ConfigUtil.convertToJsonString(config);
         Config parsedConfig = ConfigUtil.convertToConfig(configJson);
         Assertions.assertEquals(config.getConfig("env"), 
parsedConfig.getConfig("env"));
     }
+
+    @Test
+    @DisabledOnOs(OS.WINDOWS)
+    public void treeMapFunctionTest() throws IOException, URISyntaxException {
+        Map<String, Object> map =
+                JACKSON_MAPPER.readValue(
+                        config.root().render(ConfigRenderOptions.concise()),
+                        new TypeReference<Map<String, Object>>() {});
+        Map<String, Object> result = ConfigUtil.treeMap(map);
+        String expectResult =
+                IOUtils.toString(
+                        new FileReader(
+                                new File(
+                                        ConfigUtilTest.class
+                                                .getResource(
+                                                        
"/conf/option-test-json-after-treemap.json")
+                                                .toURI())));
+        Assertions.assertEquals(
+                
JACKSON_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(result),
+                expectResult);
+
+        // Check the order of option after treemap
+        Iterator<String> resultIterator = result.keySet().iterator();
+        for (String entry : map.keySet()) {
+            String r = resultIterator.next();
+            Assertions.assertEquals(entry, r);
+        }
+        String data =
+                String.join(
+                        ",",
+                        ((Map<String, Object>)
+                                        ((List<Map<String, Object>>) 
result.get("source"))
+                                                .get(0)
+                                                .get("option"))
+                                .keySet());
+        String data2 =
+                String.join(
+                        ",",
+                        ((Map<String, Object>)
+                                        ((List<Map<String, Object>>) 
map.get("source"))
+                                                .get(0)
+                                                .get("option"))
+                                .keySet());
+        String sameOrder = 
"bool,bool-str,int,int-str,float,float-str,double,double-str,map";
+        Assertions.assertEquals(
+                
"string,enum,list-json,list-str,complex-type,long,list,numeric-list,long-str,enum-list,"
+                        + sameOrder,
+                data);
+        Assertions.assertEquals(sameOrder + ",map.name", data2);
+        Map<String, Object> differentValueMap =
+                JACKSON_MAPPER.readValue(
+                        
differentValueConfig.root().render(ConfigRenderOptions.concise()),
+                        new TypeReference<Map<String, Object>>() {});
+
+        Map<String, Object> value = ConfigUtil.treeMap(differentValueMap);
+        Assertions.assertEquals(value.size(), 2);
+        Map<String, Object> expect = new HashMap<>();
+        Map<String, Object> offsets = new HashMap<>();
+        expect.put("", "specific_offsets");
+        expect.put("offsets", offsets);
+        offsets.put("test_topic_source-0", "50");
+        Assertions.assertEquals(
+                ((Map) ((List) value.get("source")).get(0)).get("start_mode"), 
expect);
+    }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index 97df47f555..0149effab1 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -76,6 +77,7 @@ public class CatalogTableUtilTest {
                 new MapType<>(
                         BasicType.STRING_TYPE,
                         new MapType<>(BasicType.STRING_TYPE, 
ArrayType.INT_ARRAY_TYPE)));
+        Assertions.assertEquals(seaTunnelRowType.getTotalFields(), 18);
         
Assertions.assertEquals(seaTunnelRowType.getFieldType(17).getSqlType(), 
SqlType.ROW);
         SeaTunnelRowType nestedRowFieldType = (SeaTunnelRowType) 
seaTunnelRowType.getFieldType(17);
         Assertions.assertEquals(
@@ -84,14 +86,26 @@ public class CatalogTableUtilTest {
                 "row", 
nestedRowFieldType.getFieldName(nestedRowFieldType.indexOf("row")));
     }
 
+    @Test
+    public void testSpecialSchemaParse() throws FileNotFoundException, 
URISyntaxException {
+        String path = getTestConfigFile("/conf/config_special_schema.conf");
+        Config config = ConfigFactory.parseFile(new File(path));
+        SeaTunnelRowType seaTunnelRowType =
+                CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        Assertions.assertEquals(seaTunnelRowType.getTotalFields(), 12);
+        Assertions.assertEquals(seaTunnelRowType.getFieldType(5).getSqlType(), 
SqlType.BYTES);
+        Assertions.assertEquals(seaTunnelRowType.getFieldName(6), "t.date");
+    }
+
     @Test
     public void testCatalogUtilGetCatalogTable() throws FileNotFoundException, 
URISyntaxException {
         String path = getTestConfigFile("/conf/getCatalogTable.conf");
         Config config = ConfigFactory.parseFile(new File(path));
         Config source = config.getConfigList("source").get(0);
+        ReadonlyConfig sourceReadonlyConfig = 
ReadonlyConfig.fromConfig(source);
         List<CatalogTable> catalogTables =
                 CatalogTableUtil.getCatalogTablesFromConfig(
-                        source, 
Thread.currentThread().getContextClassLoader());
+                        sourceReadonlyConfig, 
Thread.currentThread().getContextClassLoader());
         Assertions.assertEquals(2, catalogTables.size());
         Assertions.assertEquals(
                 TableIdentifier.of("InMemory", 
TablePath.of("st.public.table1")),
@@ -103,19 +117,23 @@ public class CatalogTableUtilTest {
         Config emptyTableSource =
                 source.withValue(
                         TABLE_NAMES.key(), ConfigValueFactory.fromIterable(new 
ArrayList<>()));
+        ReadonlyConfig emptyReadonlyConfig = 
ReadonlyConfig.fromConfig(emptyTableSource);
         Assertions.assertThrows(
                 SeaTunnelException.class,
                 () ->
                         CatalogTableUtil.getCatalogTablesFromConfig(
-                                emptyTableSource, 
Thread.currentThread().getContextClassLoader()));
+                                emptyReadonlyConfig,
+                                
Thread.currentThread().getContextClassLoader()));
         // test unknown catalog
         Config cannotFindCatalogSource =
                 source.withValue(PLUGIN_NAME, 
ConfigValueFactory.fromAnyRef("unknownCatalog"));
+        ReadonlyConfig cannotFindCatalogReadonlyConfig =
+                ReadonlyConfig.fromConfig(cannotFindCatalogSource);
         Assertions.assertThrows(
                 SeaTunnelException.class,
                 () ->
                         CatalogTableUtil.getCatalogTablesFromConfig(
-                                cannotFindCatalogSource,
+                                cannotFindCatalogReadonlyConfig,
                                 
Thread.currentThread().getContextClassLoader()));
     }
 
diff --git a/seatunnel-api/src/test/resources/conf/config_special_schema.conf 
b/seatunnel-api/src/test/resources/conf/config_special_schema.conf
new file mode 100644
index 0000000000..5cd4a31fb9
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/config_special_schema.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+// Special schema, used X.X as key. we shouldn't parse it as object of t.
+schema {
+  fields {
+    t.string = STRING
+    t.boolean = BOOLEAN
+    t.long = BIGINT
+    t.double = DOUBLE
+    t.null = NULL
+    t.byteArray = BYTES
+    t.date = DATE
+    t.localDateTime = TIMESTAMP
+    _map = "MAP<STRING, INT>"
+    t.list = "ARRAY<INT>"
+    t.int = INT
+    t.float = FLOAT
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
 
b/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
new file mode 100644
index 0000000000..8601478ba5
--- /dev/null
+++ 
b/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    start_mode = specific_offsets
+    start_mode.offsets = {
+      test_topic_source-0 = 50
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+}
\ No newline at end of file
diff --git 
a/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json 
b/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json
new file mode 100644
index 0000000000..95888d353e
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json
@@ -0,0 +1,103 @@
+{
+  "env" : {
+    "execution" : {
+      "parallelism" : "1",
+      "checkpoint" : {
+        "interval" : "5000"
+      }
+    },
+    "job" : {
+      "mode" : "STREAMING"
+    }
+  },
+  "source" : [ {
+    "option" : {
+      "string" : "Hello, Apache SeaTunnel",
+      "enum" : "LATEST",
+      "list-json" : "[\"Hello\", \"Apache SeaTunnel\"]",
+      "list-str" : "Silk,Song",
+      "complex-type" : [ {
+        "inner" : {
+          "list" : [ {
+            "inner" : {
+              "path" : "mac",
+              "name" : "ashulin",
+              "map" : "{\"fantasy\":\"final\"}"
+            },
+            "type" : "source",
+            "patch" : {
+              "note" : "hollow"
+            },
+            "name" : "saitou"
+          }, {
+            "inner" : {
+              "path" : "mac",
+              "name" : "ashulin",
+              "map" : "{\"fantasy\":\"final\"}"
+            },
+            "type" : "source",
+            "patch" : {
+              "note" : "hollow"
+            },
+            "name" : "saitou"
+          } ],
+          "list-2" : [ {
+            "inner" : {
+              "path" : "mac",
+              "name" : "ashulin",
+              "map" : "{\"fantasy\":\"final\"}"
+            },
+            "type" : "source",
+            "patch" : {
+              "note" : "hollow"
+            },
+            "name" : "saitou"
+          } ]
+        }
+      } ],
+      "long" : "21474836470",
+      "list" : [ "final", "fantasy", "VII" ],
+      "numeric-list" : [ "1", "2" ],
+      "long-str" : "21474836470",
+      "enum-list" : [ "EARLIEST", "LATEST" ],
+      "bool" : "true",
+      "bool-str" : "false",
+      "int" : "2147483647",
+      "int-str" : "100",
+      "float" : "3.3333",
+      "float-str" : "3.1415",
+      "double" : "3.141592653589793",
+      "double-str" : "3.1415926535897932384626433832795028841971",
+      "map" : {
+        "inner" : {
+          "path" : "mac",
+          "name" : "ashulin",
+          "map" : "{\"fantasy\":\"final\"}"
+        },
+        "type" : "source",
+        "patch" : {
+          "note" : "hollow"
+        },
+        "name" : "saitou"
+      }
+    },
+    "plugin_name" : "FakeSource"
+  } ],
+  "transform" : [ {
+    "plugin_name" : "sql",
+    "sql" : "select name,age from fake"
+  } ],
+  "sink" : [ {
+    "file_name_expression" : "${transactionId}_${now}",
+    "path" : "file:///tmp/hive/warehouse/test2",
+    "extendsSQL" : "insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,\n                                                
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, 
c_bigint_unsigned,\n                                                c_decimal, 
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,\n   
                                             c_char, c_tinyt [...]
+    "row_delimiter" : "\n",
+    "file_format_type" : "text",
+    "partition_dir_expression" : "${k0}=${v0}",
+    "plugin_name" : "File",
+    "field_delimiter" : "\t",
+    "partition_by" : [ "age" ],
+    "is_partition_field_write_in_file" : "true",
+    "sink_columns" : [ "name", "age" ]
+  } ]
+}
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/option-test.conf 
b/seatunnel-api/src/test/resources/conf/option-test.conf
index 9461e5298b..9d22713f55 100644
--- a/seatunnel-api/src/test/resources/conf/option-test.conf
+++ b/seatunnel-api/src/test/resources/conf/option-test.conf
@@ -70,6 +70,16 @@ source {
                     type = "source"
                     patch.note = "hollow"
                     name = "saitou"
+                },
+                {
+                    inner {
+                        path = "mac"
+                        name = "ashulin"
+                        map = """{"fantasy":"final"}"""
+                    }
+                    type = "source"
+                    patch.note = "hollow"
+                    name = "saitou"
                 }]
             }
             inner.list-2 = [{
@@ -103,5 +113,12 @@ sink {
         file_name_expression = "${transactionId}_${now}"
         file_format_type = "text"
         sink_columns = ["name","age"]
+        extendsSQL = """insert into sink (c_bit_1, c_bit_8, c_bit_16, 
c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
+                                                c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
+                                                c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
+                                                c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
+                                                c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
+                                                c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
     }
 }
\ No newline at end of file


Reply via email to