This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 243edfef3d [Improve] Remove useless ReadonlyConfig flatten feature
(#5612)
243edfef3d is described below
commit 243edfef3d362d7353cef1d0a3a0c7d3aed16008
Author: Jia Fan <[email protected]>
AuthorDate: Thu Oct 12 14:55:27 2023 +0800
[Improve] Remove useless ReadonlyConfig flatten feature (#5612)
---------
Co-authored-by: Wenjun Ruan <[email protected]>
---
.../seatunnel/api/configuration/ConfigAdapter.java | 5 +-
.../api/configuration/ReadonlyConfig.java | 72 +++----
.../api/configuration/util/ConfigUtil.java | 213 +--------------------
.../table/catalog/schema/ReadonlyConfigParser.java | 9 +-
.../api/configuration/ReadableConfigTest.java | 43 ++++-
.../api/configuration/util/ConfigUtilTest.java | 143 --------------
.../config_with_key_with_different_type_value.conf | 36 ----
.../conf/option-test-json-after-treemap.json | 103 ----------
.../src/test/resources/conf/option-test.conf | 8 +-
.../seatunnel/assertion/sink/AssertSink.java | 2 +-
.../seatunnel/jdbc/JdbcMySqlCreateTableIT.java | 3 +
.../seatunnel/jdbc/JdbcSqlServerCreateTableIT.java | 3 +
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 8 +-
13 files changed, 79 insertions(+), 569 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigAdapter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigAdapter.java
index 220fe5d768..dba681fa33 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigAdapter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigAdapter.java
@@ -31,11 +31,10 @@ public interface ConfigAdapter {
String[] extensionIdentifiers();
/**
- * Converter config file to path_key-value Map (FlattenedMap) in HOCON
+ * Converter config file to path_key-value Map in HOCON
*
- * @see
org.apache.seatunnel.api.configuration.util.ConfigUtil#flatteningMap(Map)
* @param configFilePath config file path.
- * @return FlattenedMap
+ * @return Map
*/
Map<String, Object> loadConfig(Path configFilePath);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index 71858bfad3..7c5a16aa06 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -34,8 +34,6 @@ import java.util.Optional;
import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertToJsonString;
import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertValue;
-import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.flatteningMap;
-import static org.apache.seatunnel.api.configuration.util.ConfigUtil.treeMap;
@Slf4j
public class ReadonlyConfig implements Serializable {
@@ -50,7 +48,7 @@ public class ReadonlyConfig implements Serializable {
}
public static ReadonlyConfig fromMap(Map<String, Object> map) {
- return new ReadonlyConfig(treeMap(map));
+ return new ReadonlyConfig(map);
}
public static ReadonlyConfig fromConfig(Config config) {
@@ -65,19 +63,7 @@ public class ReadonlyConfig implements Serializable {
}
public <T> T get(Option<T> option) {
- return get(option, true);
- }
-
- public <T> T get(Option<T> option, boolean flatten) {
- return getOptional(option, flatten).orElseGet(option::defaultValue);
- }
-
- public Map<String, String> toMap() {
- return toMap(true);
- }
-
- public Config toConfig() {
- return toConfig(true);
+ return getOptional(option).orElseGet(option::defaultValue);
}
/**
@@ -85,48 +71,30 @@ public class ReadonlyConfig implements Serializable {
*
* @return Config
*/
- public Config toConfig(boolean flatten) {
- if (flatten) {
- return ConfigFactory.parseMap(flatteningMap(confData));
- }
+ public Config toConfig() {
return ConfigFactory.parseMap(confData);
}
- public Map<String, String> toMap(boolean flatten) {
+ public Map<String, String> toMap() {
if (confData.isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> result = new LinkedHashMap<>();
- toMap(result, flatten);
+ toMap(result);
return result;
}
public void toMap(Map<String, String> result) {
- toMap(result, true);
- }
-
- public void toMap(Map<String, String> result, boolean flatten) {
if (confData.isEmpty()) {
return;
}
- Map<String, Object> map;
- if (flatten) {
- map = flatteningMap(confData);
- } else {
- map = confData;
- }
- for (Map.Entry<String, Object> entry : map.entrySet()) {
+ for (Map.Entry<String, Object> entry : confData.entrySet()) {
result.put(entry.getKey(), convertToJsonString(entry.getValue()));
}
}
public <T> Optional<T> getOptional(Option<T> option) {
- return getOptional(option, true);
- }
-
- @SuppressWarnings("unchecked")
- public <T> Optional<T> getOptional(Option<T> option, boolean flatten) {
if (option == null) {
throw new NullPointerException("Option not be null.");
}
@@ -146,24 +114,28 @@ public class ReadonlyConfig implements Serializable {
if (value == null) {
return Optional.empty();
}
- return Optional.of(convertValue(value, option, flatten));
+ return Optional.of(convertValue(value, option));
}
private Object getValue(String key) {
- String[] keys = key.split("\\.");
- Map<String, Object> data = this.confData;
- Object value = null;
- for (int i = 0; i < keys.length; i++) {
- value = data.get(keys[i]);
- if (i < keys.length - 1) {
- if (!(value instanceof Map)) {
- return null;
- } else {
- data = (Map<String, Object>) value;
+ if (this.confData.containsKey(key)) {
+ return this.confData.get(key);
+ } else {
+ String[] keys = key.split("\\.");
+ Map<String, Object> data = this.confData;
+ Object value = null;
+ for (int i = 0; i < keys.length; i++) {
+ value = data.get(keys[i]);
+ if (i < keys.length - 1) {
+ if (!(value instanceof Map)) {
+ return null;
+ } else {
+ data = (Map<String, Object>) value;
+ }
}
}
+ return value;
}
- return value;
}
@Override
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
index 4498928c93..e8f74f4dc2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -20,233 +20,27 @@ package org.apache.seatunnel.api.configuration.util;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.ParameterizedType;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
public class ConfigUtil {
- private static final JavaPropsMapper PROPERTIES_MAPPER = new
JavaPropsMapper();
- private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
-
- /**
- *
- *
- * <pre>
- * poll.timeout = 1000
- * ==>> poll : {timeout = 1000, interval = 500}
- * poll.interval = 500
- * </pre>
- */
- public static Map<String, Object> treeMap(Map<String, Object> rawMap) {
- try {
- Map<List<String>, String> properties =
-
Arrays.stream(PROPERTIES_MAPPER.writeValueAsString(rawMap).split("\n"))
- .filter(StringUtils::isNoneEmpty)
- .map(line -> line.split("=", 2))
- .collect(
- Collectors.toMap(
- kv ->
Arrays.asList(kv[0].split("\\.")),
- kv -> kv[1],
- (o, n) -> o,
- LinkedHashMap::new));
- Map<String, Object> result = loadPropertiesStyleMap(properties);
- // Special case, we shouldn't change key in schema config.
- // TODO we should not hard code it, it should be as a config.
- if (rawMap.containsKey(TableSchemaOptions.SCHEMA.key())) {
- result.put(
- TableSchemaOptions.SCHEMA.key(),
- rawMap.get(TableSchemaOptions.SCHEMA.key()));
- }
- return result;
- } catch (JsonProcessingException e) {
- throw new IllegalArgumentException("Json parsing exception.");
- }
- }
- private static Map<String, Object> loadPropertiesStyleMap(
- Map<List<String>, String> properties) {
- Map<String, Object> propertiesMap = new LinkedHashMap<>();
- Map<List<String>, String> temp = new LinkedHashMap<>();
- String tempPrefix = null;
- for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
- String key = entry.getKey().get(0);
- if (!key.equals(tempPrefix)) {
- putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
- tempPrefix = key;
- }
- if (entry.getKey().size() > 1) {
- temp.put(entry.getKey().subList(1, entry.getKey().size()),
entry.getValue());
- } else if (!temp.isEmpty()) {
- temp.put(Collections.singletonList(""), entry.getValue());
- } else {
- temp.put(null, entry.getValue());
- }
- }
- putKeyValueToMapCheck(propertiesMap, temp, tempPrefix);
- return propertiesMap;
- }
-
- private static void putKeyValueToMapCheck(
- Map<String, Object> propertiesMap, Map<List<String>, String> temp,
String tempPrefix) {
- if (!temp.isEmpty()) {
- if (propertiesMap.containsKey(tempPrefix)) {
- if (temp.containsKey(null)) {
- ((Map) propertiesMap.get(tempPrefix)).put("",
temp.get(null));
- } else if (propertiesMap.get(tempPrefix) instanceof String) {
- loadPropertiesStyleMap(temp).put("",
propertiesMap.get(tempPrefix));
- } else {
- mergeTwoMap((Map) propertiesMap.get(tempPrefix),
loadPropertiesStyleMap(temp));
- }
- } else {
- propertiesMap.put(tempPrefix, loadPropertiesStyleObject(temp));
- }
- temp.clear();
- }
- }
-
- private static void mergeTwoMap(Map<String, Object> base, Map<String,
Object> merged) {
- for (Map.Entry<String, Object> entry : merged.entrySet()) {
- if (base.containsKey(entry.getKey())) {
- if (base.get(entry.getKey()) instanceof Map &&
entry.getValue() instanceof Map) {
- mergeTwoMap((Map) base.get(entry.getKey()), (Map)
entry.getValue());
- } else if (base.get(entry.getKey()) instanceof Map) {
- ((Map) base.get(entry.getKey())).put("", entry.getValue());
- } else if (entry.getValue() instanceof Map) {
- Map<String, Object> child = new LinkedHashMap<>();
- child.put("", base.get(entry.getKey()));
- child.putAll((Map) entry.getValue());
- base.put(entry.getKey(), child);
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Duplicate key '%s' in config file, value
'%s' and value '%s'",
- entry.getKey(), base.get(entry.getKey()),
entry.getValue()));
- }
- } else {
- base.put(entry.getKey(), entry.getValue());
- }
- }
- }
-
- private static List<Object> loadPropertiesStyleList(Map<List<String>,
String> properties) {
- List<Object> propertiesList = new ArrayList<>();
- Map<List<String>, String> temp = new LinkedHashMap<>();
- int tempIndex = -1;
- for (Map.Entry<List<String>, String> entry : properties.entrySet()) {
- int index = Integer.parseInt(entry.getKey().get(0));
- if (index != tempIndex) {
- if (!temp.isEmpty()) {
- propertiesList.add(loadPropertiesStyleObject(temp));
- temp.clear();
- }
- tempIndex = index;
- }
- if (entry.getKey().size() == 1) {
- temp.put(null, entry.getValue());
- } else {
- temp.put(entry.getKey().subList(1, entry.getKey().size()),
entry.getValue());
- }
- }
- if (!temp.isEmpty()) {
- propertiesList.add(loadPropertiesStyleObject(temp));
- }
- return propertiesList;
- }
-
- private static Object loadPropertiesStyleObject(Map<List<String>, String>
properties) {
- if (properties.containsKey(null) && properties.size() == 1) {
- return StringEscapeUtils.unescapeJava(properties.get(null));
- } else if (properties.containsKey(null)) {
- if (properties.containsKey(null)) {
- properties.put(Collections.singletonList(""),
properties.get(null));
- properties.remove(null);
- }
- return loadPropertiesStyleMap(properties);
- } else if (properties.entrySet().stream().anyMatch(kv ->
kv.getKey().get(0).equals("1"))) {
- return loadPropertiesStyleList(properties);
- } else {
- return loadPropertiesStyleMap(properties);
- }
- }
-
- @SuppressWarnings("unchecked")
- static Object flatteningMap(
- Object rawValue, Map<String, Object> newMap, List<String> keys,
boolean nestedMap) {
- if (rawValue == null) {
- return null;
- }
- if (!(rawValue instanceof List) && !(rawValue instanceof Map)) {
- if (newMap == null) {
- return rawValue;
- }
- newMap.put(String.join(".", keys), rawValue);
- return newMap;
- }
-
- if (rawValue instanceof List) {
- List<Object> rawList = (List<Object>) rawValue;
- rawList.replaceAll(value -> flatteningMap(value, null, null,
false));
- if (newMap != null) {
- newMap.put(String.join(".", keys), rawList);
- return newMap;
- }
- return rawList;
- } else {
- Map<String, Object> rawMap = (Map<String, Object>) rawValue;
- if (!nestedMap) {
- keys = new ArrayList<>();
- newMap = new LinkedHashMap<>(rawMap.size());
- }
- for (Map.Entry<String, Object> entry : rawMap.entrySet()) {
- keys.add(entry.getKey());
- flatteningMap(entry.getValue(), newMap, keys, true);
- keys.remove(keys.size() - 1);
- }
- return newMap;
- }
- }
-
- /**
- *
- *
- * <pre>
- * poll.timeout = 1000
- * poll : {timeout = 1000, interval = 500} ==>>
- * poll.interval = 500
- * </pre>
- */
- @SuppressWarnings("unchecked")
- public static Map<String, Object> flatteningMap(Map<String, Object>
treeMap) {
- return (Map<String, Object>) flatteningMapWithObject(treeMap);
- }
-
- static Object flatteningMapWithObject(Object rawValue) {
- return flatteningMap(rawValue, null, null, false);
- }
+ private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
@SuppressWarnings("unchecked")
- public static <T> T convertValue(Object rawValue, Option<T> option,
boolean flatten) {
+ public static <T> T convertValue(Object rawValue, Option<T> option) {
TypeReference<T> typeReference = option.typeReference();
- rawValue = flatten ? flatteningMapWithObject(rawValue) : rawValue;
if (typeReference.getType() instanceof Class) {
// simple type
Class<T> clazz = (Class<T>) typeReference.getType();
@@ -404,6 +198,9 @@ public class ConfigUtil {
}
public static String convertToJsonString(Object o) {
+ if (o == null) {
+ return null;
+ }
if (o instanceof String) {
return (String) o;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index 5e9792c4af..8ed0a7a058 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -86,8 +86,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
@Override
public List<Column> parse(ReadonlyConfig schemaConfig) {
JsonNode jsonNode =
- JsonUtils.toJsonNode(
-
schemaConfig.get(TableSchemaOptions.FieldOptions.FIELDS, false));
+
JsonUtils.toJsonNode(schemaConfig.get(TableSchemaOptions.FieldOptions.FIELDS));
Map<String, String> fieldsMap = JsonUtils.toStringMap(jsonNode);
int fieldsNum = fieldsMap.size();
List<Column> columns = new ArrayList<>(fieldsNum);
@@ -107,7 +106,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
@Override
public List<Column> parse(ReadonlyConfig schemaConfig) {
- return schemaConfig.get(TableSchemaOptions.ColumnOptions.COLUMNS,
false).stream()
+ return
schemaConfig.get(TableSchemaOptions.ColumnOptions.COLUMNS).stream()
.map(ReadonlyConfig::fromMap)
.map(
columnConfig -> {
@@ -120,9 +119,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
"schema.columns.* config need option [name], please correct your config
first"));
SeaTunnelDataType<?> seaTunnelDataType =
columnConfig
- .getOptional(
-
TableSchemaOptions.ColumnOptions.TYPE,
- false)
+
.getOptional(TableSchemaOptions.ColumnOptions.TYPE)
.map(
SeaTunnelDataTypeConvertorUtil
::deserializeSeaTunnelDataType)
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
index 9e731efab4..56272de408 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
@@ -38,7 +38,7 @@ import java.util.Map;
public class ReadableConfigTest {
private static final String CONFIG_PATH = "/conf/option-test.conf";
private static ReadonlyConfig config;
- private static Map<String, String> map;
+ private static Map<String, Object> map;
@BeforeAll
public static void prepare() throws URISyntaxException {
@@ -52,9 +52,11 @@ public class ReadableConfigTest {
ConfigResolveOptions.defaults().setAllowUnresolved(true));
config =
ReadonlyConfig.fromConfig(rawConfig.getConfigList("source").get(0));
map = new HashMap<>();
- map.put("inner.path", "mac");
- map.put("inner.name", "ashulin");
- map.put("inner.map", "{\"fantasy\":\"final\"}");
+ Map<String, String> inner = new HashMap<>();
+ inner.put("path", "mac");
+ inner.put("name", "ashulin");
+ inner.put("map", "{\"fantasy\":\"final\"}");
+ map.put("inner", inner);
map.put("type", "source");
map.put("patch.note", "hollow");
map.put("name", "saitou");
@@ -182,7 +184,11 @@ public class ReadableConfigTest {
@Test
public void testBasicMapOption() {
Assertions.assertEquals(
- map,
config.get(Options.key("option.map").mapType().noDefaultValue()));
+ map,
+ config.get(
+ Options.key("option.map")
+ .type(new TypeReference<Map<String, Object>>()
{})
+ .noDefaultValue()));
Map<String, String> newMap = new HashMap<>();
newMap.put("fantasy", "final");
Assertions.assertEquals(
@@ -222,24 +228,33 @@ public class ReadableConfigTest {
@Test
public void testComplexTypeOption() {
- List<Map<String, List<Map<String, String>>>> complexType =
+ List<Map<String, Map<String, List<Map<String, Object>>>>> complexType =
config.get(
Options.key("option.complex-type")
.type(
new TypeReference<
- List<Map<String,
List<Map<String, String>>>>>() {})
+ List<
+ Map<
+ String,
+ Map<
+ String,
+ List<
+
Map<
+
String,
+
Object>>>>>>() {})
.noDefaultValue());
Assertions.assertEquals(1, complexType.size());
- Assertions.assertEquals(2, complexType.get(0).size());
+ Assertions.assertEquals(2, complexType.get(0).get("inner").size());
complexType
.get(0)
+ .get("inner")
.values()
.forEach(
value -> {
Assertions.assertEquals(map, value.get(0));
});
- Assertions.assertEquals(complexType.get(0).get("inner.list").size(),
2);
- Assertions.assertEquals(complexType.get(0).get("inner.list-2").size(),
1);
+
Assertions.assertEquals(complexType.get(0).get("inner").get("list").size(), 2);
+
Assertions.assertEquals(complexType.get(0).get("inner").get("list-2").size(),
1);
}
@Test
@@ -298,4 +313,12 @@ public class ReadableConfigTest {
readonlyConfig = ReadonlyConfig.fromMap(map);
Assertions.assertEquals("ark", readonlyConfig.get(usernameOption));
}
+
+ @Test
+ public void testNullValue() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("user", null);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+ Assertions.assertNull(readonlyConfig.toMap().get("user"));
+ }
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
index 357bda78b5..f15870c35f 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigUtilTest.java
@@ -17,39 +17,20 @@
package org.apache.seatunnel.api.configuration.util;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-
-import org.apache.logging.log4j.core.util.IOUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
public class ConfigUtilTest {
- private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
-
private static Config config;
- private static Config differentValueConfig;
-
@BeforeAll
public static void init() throws URISyntaxException {
config =
@@ -59,15 +40,6 @@ public class ConfigUtilTest {
.getResource("/conf/option-test.conf")
.toURI())
.toFile());
-
- differentValueConfig =
- ConfigFactory.parseFile(
- Paths.get(
- ConfigUtilTest.class
- .getResource(
-
"/conf/config_with_key_with_different_type_value.conf")
- .toURI())
- .toFile());
}
@Test
@@ -76,119 +48,4 @@ public class ConfigUtilTest {
Config parsedConfig = ConfigUtil.convertToConfig(configJson);
Assertions.assertEquals(config.getConfig("env"),
parsedConfig.getConfig("env"));
}
-
- @Test
- @DisabledOnOs(OS.WINDOWS)
- public void treeMapFunctionTest() throws IOException, URISyntaxException {
- Map<String, Object> map =
- JACKSON_MAPPER.readValue(
- config.root().render(ConfigRenderOptions.concise()),
- new TypeReference<Map<String, Object>>() {});
- Map<String, Object> result = ConfigUtil.treeMap(map);
- String expectResult =
- IOUtils.toString(
- new FileReader(
- new File(
- ConfigUtilTest.class
- .getResource(
-
"/conf/option-test-json-after-treemap.json")
- .toURI())));
- Assertions.assertEquals(
-
JACKSON_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(result),
- expectResult);
-
- // Check the order of option after treemap
- Iterator<String> resultIterator = result.keySet().iterator();
- for (String entry : map.keySet()) {
- String r = resultIterator.next();
- Assertions.assertEquals(entry, r);
- }
- String data =
- String.join(
- ",",
- ((Map<String, Object>)
- ((List<Map<String, Object>>)
result.get("source"))
- .get(0)
- .get("option"))
- .keySet());
- String data2 =
- String.join(
- ",",
- ((Map<String, Object>)
- ((List<Map<String, Object>>)
map.get("source"))
- .get(0)
- .get("option"))
- .keySet());
- String sameOrder =
"bool,bool-str,int,int-str,float,float-str,double,double-str,map";
- Assertions.assertEquals(
-
"string,enum,list-json,list-str,complex-type,long,list,numeric-list,long-str,enum-list,"
- + sameOrder,
- data);
- Assertions.assertEquals(sameOrder + ",map.name", data2);
- Map<String, Object> differentValueMap =
- JACKSON_MAPPER.readValue(
-
differentValueConfig.root().render(ConfigRenderOptions.concise()),
- new TypeReference<Map<String, Object>>() {});
-
- Map<String, Object> value = ConfigUtil.treeMap(differentValueMap);
- Assertions.assertEquals(value.size(), 2);
- Map<String, Object> expect = new HashMap<>();
- Map<String, Object> offsets = new HashMap<>();
- expect.put("", "specific_offsets");
- expect.put("offsets", offsets);
- offsets.put("test_topic_source-0", "50");
- Assertions.assertEquals(
- ((Map) ((List) value.get("source")).get(0)).get("start_mode"),
expect);
- }
-
- @Test
- public void testSamePrefixDifferentSuffixKey() {
- Map<String, Object> config = new LinkedHashMap<>();
- config.put(
- "fs.s3a.aws.credentials.provider",
- "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
- config.put("other", "value");
- config.put("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn");
- Map<String, Object> result = ConfigUtil.treeMap(config);
- Map<String, Object> s3aMap =
- (Map<String, Object>) ((Map<String, Object>)
result.get("fs")).get("s3a");
- Assertions.assertTrue(s3aMap.containsKey("aws"));
- Assertions.assertTrue(s3aMap.containsKey("endpoint"));
- Assertions.assertEquals(2, s3aMap.size());
- }
-
- @Test
- public void testSamePrefixDifferentValueType() {
- Map<String, Object> config = new LinkedHashMap<>();
- config.put("start.mode", "CONSUME_FROM_TIMESTAMP");
- config.put("other", "value");
- config.put("start.mode.timestamp", "1667179890315");
- Map<String, Object> result = ConfigUtil.treeMap(config);
- Map<String, Object> s3aMap =
- (Map<String, Object>) ((Map<String, Object>)
result.get("start")).get("mode");
- Assertions.assertTrue(s3aMap.containsKey(""));
- Assertions.assertTrue(s3aMap.containsKey("timestamp"));
- Assertions.assertEquals(2, s3aMap.size());
-
- config.clear();
- config.put("start.mode", "CONSUME_FROM_TIMESTAMP");
- config.put("start.mode.timestamp", "1667179890315");
- Map<String, Object> result2 = ConfigUtil.treeMap(config);
- Map<String, Object> s3aMap2 =
- (Map<String, Object>) ((Map<String, Object>)
result2.get("start")).get("mode");
- Assertions.assertTrue(s3aMap2.containsKey(""));
- Assertions.assertTrue(s3aMap2.containsKey("timestamp"));
- Assertions.assertEquals(2, s3aMap2.size());
-
- config.clear();
- config.put("start.mode", "CONSUME_FROM_TIMESTAMP");
- config.put("start.mode.timestamp.test1", "1667179890315");
- config.put("start.mode.timestamp.test2", "1667179890315");
- Map<String, Object> result3 = ConfigUtil.treeMap(config);
- Map<String, Object> s3aMap3 =
- (Map<String, Object>) ((Map<String, Object>)
result3.get("start")).get("mode");
- Assertions.assertTrue(s3aMap3.containsKey(""));
- Assertions.assertTrue(s3aMap3.containsKey("timestamp"));
- Assertions.assertEquals(2, s3aMap3.size());
- }
}
diff --git
a/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
b/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
deleted file mode 100644
index 8601478ba5..0000000000
---
a/seatunnel-api/src/test/resources/conf/config_with_key_with_different_type_value.conf
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- execution.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- start_mode = specific_offsets
- start_mode.offsets = {
- test_topic_source-0 = 50
- }
- }
-}
-
-transform {
-}
-
-sink {
-}
\ No newline at end of file
diff --git
a/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json
b/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json
deleted file mode 100644
index 95888d353e..0000000000
--- a/seatunnel-api/src/test/resources/conf/option-test-json-after-treemap.json
+++ /dev/null
@@ -1,103 +0,0 @@
-{
- "env" : {
- "execution" : {
- "parallelism" : "1",
- "checkpoint" : {
- "interval" : "5000"
- }
- },
- "job" : {
- "mode" : "STREAMING"
- }
- },
- "source" : [ {
- "option" : {
- "string" : "Hello, Apache SeaTunnel",
- "enum" : "LATEST",
- "list-json" : "[\"Hello\", \"Apache SeaTunnel\"]",
- "list-str" : "Silk,Song",
- "complex-type" : [ {
- "inner" : {
- "list" : [ {
- "inner" : {
- "path" : "mac",
- "name" : "ashulin",
- "map" : "{\"fantasy\":\"final\"}"
- },
- "type" : "source",
- "patch" : {
- "note" : "hollow"
- },
- "name" : "saitou"
- }, {
- "inner" : {
- "path" : "mac",
- "name" : "ashulin",
- "map" : "{\"fantasy\":\"final\"}"
- },
- "type" : "source",
- "patch" : {
- "note" : "hollow"
- },
- "name" : "saitou"
- } ],
- "list-2" : [ {
- "inner" : {
- "path" : "mac",
- "name" : "ashulin",
- "map" : "{\"fantasy\":\"final\"}"
- },
- "type" : "source",
- "patch" : {
- "note" : "hollow"
- },
- "name" : "saitou"
- } ]
- }
- } ],
- "long" : "21474836470",
- "list" : [ "final", "fantasy", "VII" ],
- "numeric-list" : [ "1", "2" ],
- "long-str" : "21474836470",
- "enum-list" : [ "EARLIEST", "LATEST" ],
- "bool" : "true",
- "bool-str" : "false",
- "int" : "2147483647",
- "int-str" : "100",
- "float" : "3.3333",
- "float-str" : "3.1415",
- "double" : "3.141592653589793",
- "double-str" : "3.1415926535897932384626433832795028841971",
- "map" : {
- "inner" : {
- "path" : "mac",
- "name" : "ashulin",
- "map" : "{\"fantasy\":\"final\"}"
- },
- "type" : "source",
- "patch" : {
- "note" : "hollow"
- },
- "name" : "saitou"
- }
- },
- "plugin_name" : "FakeSource"
- } ],
- "transform" : [ {
- "plugin_name" : "sql",
- "sql" : "select name,age from fake"
- } ],
- "sink" : [ {
- "file_name_expression" : "${transactionId}_${now}",
- "path" : "file:///tmp/hive/warehouse/test2",
- "extendsSQL" : "insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,\n
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint,
c_bigint_unsigned,\n c_decimal,
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,\n
c_char, c_tinyt [...]
- "row_delimiter" : "\n",
- "file_format_type" : "text",
- "partition_dir_expression" : "${k0}=${v0}",
- "plugin_name" : "File",
- "field_delimiter" : "\t",
- "partition_by" : [ "age" ],
- "is_partition_field_write_in_file" : "true",
- "sink_columns" : [ "name", "age" ]
- } ]
-}
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/option-test.conf
b/seatunnel-api/src/test/resources/conf/option-test.conf
index 9d22713f55..48af03c87f 100644
--- a/seatunnel-api/src/test/resources/conf/option-test.conf
+++ b/seatunnel-api/src/test/resources/conf/option-test.conf
@@ -41,8 +41,8 @@ source {
}
type = "source"
patch.note = "hollow"
+ name = "saitou"
}
- map.name = "saitou"
}
option.long = 21474836470
option.long-str = "21474836470"
@@ -81,8 +81,7 @@ source {
patch.note = "hollow"
name = "saitou"
}]
- }
- inner.list-2 = [{
+ list-2 = [{
inner {
path = "mac"
name = "ashulin"
@@ -91,7 +90,8 @@ source {
type = "source"
patch.note = "hollow"
name = "saitou"
- }]
+ }]
+ }
}]
}
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 86ae7bca77..5bdf46cebe 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -63,7 +63,7 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
if (!pluginConfig.getOptional(RULES).isPresent()) {
Throwables.propagateIfPossible(new
ConfigException.Missing(RULES.key()));
}
- Config ruleConfig = ConfigFactory.parseMap(pluginConfig.get(RULES,
false));
+ Config ruleConfig = ConfigFactory.parseMap(pluginConfig.get(RULES));
List<? extends Config> rowConfigList = null;
List<? extends Config> configList = null;
if (ruleConfig.hasPath(ROW_RULES)) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
index 6848f059f7..e5ca784d2e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
@@ -309,6 +309,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase
implements TestResourc
sqlserver_container.close();
mysql_container.close();
POSTGRESQL_CONTAINER.close();
+
dockerClient.removeContainerCmd(sqlserver_container.getContainerId()).exec();
+
dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec();
+
dockerClient.removeContainerCmd(POSTGRESQL_CONTAINER.getContainerId()).exec();
}
private Connection getJdbcSqlServerConnection() throws SQLException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
index a2153c7e31..5f051e6444 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
@@ -318,12 +318,15 @@ public class JdbcSqlServerCreateTableIT extends
TestSuiteBase implements TestRes
public void tearDown() throws Exception {
if (sqlserver_container != null) {
sqlserver_container.close();
+
dockerClient.removeContainerCmd(sqlserver_container.getContainerId()).exec();
}
if (mysql_container != null) {
mysql_container.close();
+
dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec();
}
if (POSTGRESQL_CONTAINER != null) {
POSTGRESQL_CONTAINER.close();
+
dockerClient.removeContainerCmd(POSTGRESQL_CONTAINER.getContainerId()).exec();
}
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 6e97daed0a..d54a2addaf 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -43,11 +43,9 @@ public class TestSQLIT extends TestSuiteBase {
Container.ExecResult sqlFuncDatetime =
container.executeJob("/sql_transform/func_datetime.conf");
Assertions.assertEquals(0, sqlFuncDatetime.getExitCode());
- // todo: wait for fix the readonly config point bug:
- // https://github.com/apache/seatunnel/issues/5597
- // Container.ExecResult sqlFuncSystem =
- //
container.executeJob("/sql_transform/func_system.conf");
- // Assertions.assertEquals(0, sqlFuncSystem.getExitCode());
+ Container.ExecResult sqlFuncSystem =
+ container.executeJob("/sql_transform/func_system.conf");
+ Assertions.assertEquals(0, sqlFuncSystem.getExitCode());
Container.ExecResult sqlCriteriaFilter =
container.executeJob("/sql_transform/criteria_filter.conf");
Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode());