This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 19d8c442f [api] improve config options (#2179)
19d8c442f is described below
commit 19d8c442fcf6894780d202ca5220bd1dbe77fd27
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Jul 27 10:48:32 2022 +0800
[api] improve config options (#2179)
* [api] improve config options
* [api] option test & new config
* [api] option check style
* [api] support complex type option
* [api] deprecated synchronized & simplified equlas
A read-only configuration don't require the 'synchronized' keyword.
hashmap initialcapacity: https://cloud.tencent.com/developer/article/1913508
* [api] suppress magic number warning
---
seatunnel-api/pom.xml | 9 +
.../apache/seatunnel/api/configuration/Option.java | 93 ++++++++++
.../seatunnel/api/configuration/Options.java | 190 ++++++++++++++++++++
.../api/configuration/ReadonlyConfig.java | 132 ++++++++++++++
.../api/configuration/util/Condition.java | 142 +++++++++++++++
.../api/configuration/util/ConfigUtil.java | 176 ++++++++++++++++++
.../api/configuration/util/Expression.java | 123 +++++++++++++
.../api/configuration/util/OptionRule.java | 200 +++++++++++++++++++++
.../util/OptionValidationException.java} | 22 ++-
.../api/configuration/util/RequiredOption.java | 162 +++++++++++++++++
.../seatunnel/api/table/factory/Factory.java | 10 ++
.../seatunnel/api/configuration/OptionTest.java | 46 +++++
.../api/configuration/ReadableConfigTest.java | 158 ++++++++++++++++
.../api/configuration/util/ConditionTest.java | 48 +++++
.../api/configuration/util/ExpressionTest.java | 38 ++++
.../api/configuration/util/OptionRuleTest.java | 114 ++++++++++++
.../src/test/resources/conf/option-test.conf | 98 ++++++++++
tools/dependencies/known-dependencies.txt | 1 +
18 files changed, 1750 insertions(+), 12 deletions(-)
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index 7b6829a0f..058e240cf 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -38,5 +38,14 @@
<artifactId>seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-properties</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
new file mode 100644
index 000000000..cfb10412e
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.Objects;
+
+public class Option<T> {
+ /**
+ * The current key for that config option.
+ */
+ private final String key;
+
+ /**
+ * Type of the value that this Option describes.
+ */
+ private final TypeReference<T> typeReference;
+
+ /**
+ * The default value for this option.
+ */
+ private final T defaultValue;
+
+ /**
+ * The description for this option.
+ */
+ String description = "";
+
+ Option(String key, TypeReference<T> typeReference, T defaultValue) {
+ this.key = key;
+ this.typeReference = typeReference;
+ this.defaultValue = defaultValue;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public TypeReference<T> typeReference() {
+ return typeReference;
+ }
+
+ public T defaultValue() {
+ return defaultValue;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Option<T> withDescription(String description) {
+ this.description = description;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Option)) {
+ return false;
+ }
+ Option<?> that = (Option<?>) obj;
+ return Objects.equals(this.key, that.key) &&
Objects.equals(this.defaultValue, that.defaultValue);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.key, this.defaultValue);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Key: '%s', default: %s", key, defaultValue);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
new file mode 100644
index 000000000..b879da863
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+public class Options {
+
+ /**
+ * Starts building a new {@link Option}.
+ *
+ * @param key The key for the config option.
+ * @return The builder for the config option with the given key.
+ */
+ public static OptionBuilder key(String key) {
+ checkArgument(StringUtils.isNotBlank(key), "Option's key not be
null.");
+ return new OptionBuilder(key);
+ }
+
+ /**
+ * The option builder is used to create a {@link Option}. It is
instantiated via {@link
+ * Options#key(String)}.
+ */
+ public static final class OptionBuilder {
+ private final String key;
+
+ /**
+ * Creates a new OptionBuilder.
+ *
+ * @param key The key for the config option
+ */
+ OptionBuilder(String key) {
+ this.key = key;
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Boolean}
type.
+ */
+ public TypedOptionBuilder<Boolean> booleanType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Boolean>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Integer}
type.
+ */
+ public TypedOptionBuilder<Integer> intType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Integer>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Long} type.
+ */
+ public TypedOptionBuilder<Long> longType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Long>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Float}
type.
+ */
+ public TypedOptionBuilder<Float> floatType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Float>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Double}
type.
+ */
+ public TypedOptionBuilder<Double> doubleType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Double>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link String}
type.
+ */
+ public TypedOptionBuilder<String> stringType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<String>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Duration}
type.
+ */
+ public TypedOptionBuilder<Duration> durationType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Duration>()
{
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Enum} type.
+ *
+ * @param enumClass Concrete type of the expected enum.
+ */
+ public <T extends Enum<T>> TypedOptionBuilder<T> enumType(Class<T>
enumClass) {
+ return new TypedOptionBuilder<>(key, new TypeReference<T>() {
+ @Override
+ public Type getType() {
+ return enumClass;
+ }
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be a set of properties,
which can be
+ * represented as {@code Map<String, String>}.
+ */
+ public TypedOptionBuilder<Map<String, String>> mapType() {
+ return new TypedOptionBuilder<>(key, new TypeReference<Map<String,
String>>() {
+ });
+ }
+
+ /**
+ * Defines that the value of the option should be a list of
properties, which can be
+ * represented as {@code List<String>}.
+ */
+ public TypedOptionBuilder<List<String>> listType() {
+ return new TypedOptionBuilder<>(key, new
TypeReference<List<String>>() {
+ });
+ }
+
+ /**
+ * The value of the definition option should be represented as T.
+ *
+ * @param typeReference complex type reference
+ */
+ public <T> TypedOptionBuilder<T> type(TypeReference<T> typeReference) {
+ return new TypedOptionBuilder<T>(key, typeReference);
+ }
+ }
+
+ /**
+ * Builder for {@link Option} with a defined atomic type.
+ *
+ * @param <T> atomic type of the option
+ */
+ public static class TypedOptionBuilder<T> {
+ private final String key;
+ private final TypeReference<T> typeReference;
+
+ TypedOptionBuilder(String key, TypeReference<T> typeReference) {
+ this.key = key;
+ this.typeReference = typeReference;
+ }
+
+ /**
+ * Creates a Option with the given default value.
+ *
+ * @param value The default value for the config option
+ * @return The config option with the default value.
+ */
+ public Option<T> defaultValue(T value) {
+ return new Option<>(key, typeReference, value);
+ }
+
+ /**
+ * Creates a Option without a default value.
+ *
+ * @return The config option without a default value.
+ */
+ public Option<T> noDefaultValue() {
+ return new Option<>(key, typeReference, null);
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
new file mode 100644
index 000000000..beb9d08a9
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertToJsonString;
+import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertValue;
+import static
org.apache.seatunnel.api.configuration.util.ConfigUtil.flatteningMap;
+import static org.apache.seatunnel.api.configuration.util.ConfigUtil.treeMap;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class ReadonlyConfig {
+
+ private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
+
+ /**
+ * Stores the concrete key/value pairs of this configuration object.
+ */
+ protected final Map<String, Object> confData;
+
+ private ReadonlyConfig(Map<String, Object> confData) {
+ this.confData = confData;
+ }
+
+ public static ReadonlyConfig fromMap(Map<String, Object> map) {
+ return new ReadonlyConfig(treeMap(map));
+ }
+
+ public static ReadonlyConfig fromConfig(Config config) {
+ try {
+ return fromMap(JACKSON_MAPPER.readValue(
+ config.root().render(ConfigRenderOptions.concise()),
+ new TypeReference<Map<String, Object>>() {
+ }));
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Json parsing exception.", e);
+ }
+ }
+
+ public <T> T get(Option<T> option) {
+ return getOptional(option).orElseGet(option::defaultValue);
+ }
+
+ @SuppressWarnings("MagicNumber")
+ public Map<String, String> toMap() {
+ if (confData.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, Object> flatteningMap = flatteningMap(confData);
+ Map<String, String> result = new HashMap<>((flatteningMap.size() << 2)
/ 3 + 1);
+ for (Map.Entry<String, Object> entry : flatteningMap.entrySet()) {
+ result.put(entry.getKey(), convertToJsonString(entry.getValue()));
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> Optional<T> getOptional(Option<T> option) {
+ if (option == null) {
+ throw new NullPointerException("Option not be null.");
+ }
+ String[] keys = option.key().split("\\.");
+ Map<String, Object> data = this.confData;
+ Object value = null;
+ for (int i = 0; i < keys.length; i++) {
+ value = data.get(keys[i]);
+ if (i < keys.length - 1) {
+ if (!((value instanceof Map))) {
+ return Optional.empty();
+ } else {
+ data = (Map<String, Object>) value;
+ }
+ }
+ }
+ if (value == null) {
+ return Optional.empty();
+ }
+ return Optional.of(convertValue(value, option.typeReference()));
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ for (String s : this.confData.keySet()) {
+ hash ^= s.hashCode();
+ }
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ReadonlyConfig)) {
+ return false;
+ }
+ Map<String, Object> otherConf = ((ReadonlyConfig) obj).confData;
+ return this.confData.equals(otherConf);
+ }
+
+ @Override
+ public String toString() {
+ return convertToJsonString(this.confData);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
new file mode 100644
index 000000000..74c08c1c7
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Objects;
+
+public class Condition<T> {
+ private final Option<T> option;
+ private final T expectValue;
+ private Boolean and = null;
+ private Condition<?> next = null;
+
+ Condition(Option<T> option, T expectValue) {
+ this.option = option;
+ this.expectValue = expectValue;
+ }
+
+ public static <T> Condition<T> of(Option<T> option, T expectValue) {
+ return new Condition<>(option, expectValue);
+ }
+
+ public <E> Condition<T> and(Option<E> option, E expectValue) {
+ return and(of(option, expectValue));
+ }
+
+ public <E> Condition<T> or(Option<E> option, E expectValue) {
+ return or(of(option, expectValue));
+ }
+
+ public Condition<T> and(Condition<?> next) {
+ addCondition(true, next);
+ return this;
+ }
+
+ public Condition<T> or(Condition<?> next) {
+ addCondition(false, next);
+ return this;
+ }
+
+ private void addCondition(boolean and, Condition<?> next) {
+ Condition<?> tail = getTailCondition();
+ tail.and = and;
+ tail.next = next;
+ }
+
+ protected int getCount() {
+ int i = 1;
+ Condition<?> cur = this;
+ while (cur.hasNext()) {
+ i++;
+ cur = cur.next;
+ }
+ return i;
+ }
+
+ Condition<?> getTailCondition() {
+ return hasNext() ? this.next.getTailCondition() : this;
+ }
+
+ public boolean hasNext() {
+ return this.next != null;
+ }
+
+ public Condition<?> getNext() {
+ return this.next;
+ }
+
+ public Option<T> getOption() {
+ return option;
+ }
+
+ public T getExpectValue() {
+ return expectValue;
+ }
+
+ public Boolean and() {
+ return this.and;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Condition)) {
+ return false;
+ }
+ Condition<?> that = (Condition<?>) obj;
+ return Objects.equals(this.option, that.option)
+ && Objects.equals(this.expectValue, that.expectValue)
+ && Objects.equals(this.and, that.and)
+ && Objects.equals(this.next, that.next);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.option, this.expectValue, this.and,
this.next);
+ }
+
+ @Override
+ public String toString() {
+ Condition<?> cur = this;
+ StringBuilder builder = new StringBuilder();
+ boolean bracket = false;
+ do {
+ builder.append("'")
+ .append(cur.option.key())
+ // TODO: support another condition
+ .append("' == ")
+ .append(cur.expectValue);
+ if (bracket) {
+ builder = new StringBuilder(String.format("(%s)", builder));
+ bracket = false;
+ }
+ if (cur.hasNext()) {
+ if (cur.next.hasNext() && !cur.and.equals(cur.next.and)) {
+ bracket = true;
+ }
+ builder.append(cur.and ? " && " : " || ");
+ }
+ cur = cur.next;
+ } while (cur != null);
+ return builder.toString();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
new file mode 100644
index 000000000..998e58bd7
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ConfigUtil {
+ private static final JavaPropsMapper PROPERTIES_MAPPER = new
JavaPropsMapper();
+ private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
+
+ /**
+ * <pre>
+ * poll.timeout = 1000
+ * ==>> poll : {timeout = 1000, interval = 500}
+ * poll.interval = 500
+ * </pre>
+ */
+ public static Map<String, Object> treeMap(Object rawMap) {
+ try {
+ return
PROPERTIES_MAPPER.readValue(PROPERTIES_MAPPER.writeValueAsString(rawMap), new
TypeReference<Map<String, Object>>() {
+ });
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Json parsing exception.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ static Object flatteningMap(Object rawValue, Map<String, Object> newMap,
List<String> keys, boolean nestedMap) {
+ if (rawValue == null) {
+ return null;
+ }
+ if (!(rawValue instanceof List) && !(rawValue instanceof Map)) {
+ if (newMap == null) {
+ return rawValue;
+ }
+ newMap.put(String.join(".", keys), rawValue);
+ return newMap;
+ }
+
+ if (rawValue instanceof List) {
+ List<Object> rawList = (List<Object>) rawValue;
+ for (int i = 0; i < rawList.size(); i++) {
+ rawList.set(i, flatteningMap(rawList.get(i), null, null,
false));
+ }
+ if (newMap != null) {
+ newMap.put(String.join(".", keys), rawList);
+ return newMap;
+ }
+ return rawList;
+ } else {
+ Map<String, Object> rawMap = (Map<String, Object>) rawValue;
+ if (!nestedMap) {
+ keys = new ArrayList<>();
+ newMap = new HashMap<>(rawMap.size());
+ }
+ for (Map.Entry<String, Object> entry : rawMap.entrySet()) {
+ keys.add(entry.getKey());
+ flatteningMap(entry.getValue(), newMap, keys, true);
+ keys.remove(keys.size() - 1);
+ }
+ return newMap;
+ }
+ }
+
+ /**
+ * <pre>
+ * poll.timeout = 1000
+ * poll : {timeout = 1000, interval = 500} ==>>
+ * poll.interval = 500
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> flatteningMap(Map<String, Object>
treeMap) {
+ return (Map<String, Object>) flatteningMapWithObject(treeMap);
+ }
+
+ static Object flatteningMapWithObject(Object rawValue) {
+ return flatteningMap(rawValue, null, null, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> T convertValue(Object rawValue, TypeReference<T>
typeReference) {
+ rawValue = flatteningMapWithObject(rawValue);
+ if (typeReference.getType() instanceof Class) {
+ // simple type
+ Class<T> clazz = (Class<T>) typeReference.getType();
+ if (clazz.equals(rawValue.getClass())) {
+ return (T) rawValue;
+ }
+ try {
+ return convertValue(rawValue, clazz);
+ } catch (IllegalArgumentException e) {
+ // Continue with Jackson parsing
+ }
+ }
+ try {
+ // complex type && untreated type
+ return JACKSON_MAPPER.readValue(convertToJsonString(rawValue),
typeReference);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(String.format("Json parsing
exception, value '%s', and expected type '%s'", rawValue,
typeReference.getType().getTypeName()), e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> T convertValue(Object rawValue, Class<T> clazz) {
+ if (Boolean.class.equals(clazz)) {
+ return (T) convertToBoolean(rawValue);
+ } else if (clazz.isEnum()) {
+ return (T) convertToEnum(rawValue, (Class<? extends Enum<?>>)
clazz);
+ } else if (String.class.equals(clazz)) {
+ return (T) convertToJsonString(rawValue);
+ }
+ throw new IllegalArgumentException("Unsupported type: " + clazz);
+ }
+
+ static Boolean convertToBoolean(Object o) {
+ switch (o.toString().toUpperCase()) {
+ case "TRUE":
+ return true;
+ case "FALSE":
+ return false;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unrecognized option for boolean: %s. Expected either
true or false(case insensitive)",
+ o));
+ }
+ }
+
+ static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
+ return Arrays.stream(clazz.getEnumConstants())
+ .filter(e -> e.toString()
+ .toUpperCase(Locale.ROOT)
+ .equals(o.toString().toUpperCase(Locale.ROOT)))
+ .findAny()
+ .orElseThrow(() -> new IllegalArgumentException(String.format(
+ "Could not parse value for enum %s. Expected one of: [%s]",
+ clazz, Arrays.toString(clazz.getEnumConstants()))));
+ }
+
+ public static String convertToJsonString(Object o) {
+ if (o instanceof String) {
+ return (String) o;
+ }
+ try {
+ return JACKSON_MAPPER.writeValueAsString(o);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(String.format("Could not parse
json, value: %s", o));
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
new file mode 100644
index 000000000..bceb86223
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Objects;
+
+public class Expression {
+ private final Condition<?> condition;
+ private Boolean and = null;
+ private Expression next = null;
+
+ Expression(Condition<?> condition) {
+ this.condition = condition;
+ }
+
+ public static <T> Expression of(Option<T> option, T expectValue) {
+ return new Expression(Condition.of(option, expectValue));
+ }
+
+ public static Expression of(Condition<?> condition) {
+ return new Expression(condition);
+ }
+
+ public Expression and(Expression next) {
+ addExpression(true, next);
+ return this;
+ }
+
+ public Expression or(Expression next) {
+ addExpression(false, next);
+ return this;
+ }
+
+ private void addExpression(boolean and, Expression next) {
+ Expression tail = getTailExpression();
+ tail.and = and;
+ tail.next = next;
+ }
+
+ private Expression getTailExpression() {
+ return hasNext() ? this.next.getTailExpression() : this;
+ }
+
+ public Condition<?> getCondition() {
+ return condition;
+ }
+
+ public boolean hasNext() {
+ return this.next != null;
+ }
+
+ public Expression getNext() {
+ return this.next;
+ }
+
+ public Boolean and() {
+ return this.and;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Expression)) {
+ return false;
+ }
+ Expression that = (Expression) obj;
+ return Objects.equals(this.condition, that.condition)
+ && Objects.equals(this.and, that.and)
+ && Objects.equals(this.next, that.next);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.condition, this.and, this.next);
+ }
+
+ @Override
+ public String toString() {
+ Expression cur = this;
+ StringBuilder builder = new StringBuilder();
+ boolean bracket = false;
+ do {
+ if (cur.condition.getCount() > 1) {
+ builder.append("(")
+ .append(cur.condition)
+ .append(")");
+ } else {
+ builder.append(cur.condition);
+ }
+ if (bracket) {
+ builder = new StringBuilder(String.format("(%s)", builder));
+ bracket = false;
+ }
+ if (cur.hasNext()) {
+ if (cur.next.hasNext() && !cur.and.equals(cur.next.and)) {
+ bracket = true;
+ }
+ builder.append(cur.and ? " && " : " || ");
+ }
+ cur = cur.next;
+ } while (cur != null);
+ return builder.toString();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
new file mode 100644
index 000000000..c830709c2
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Validation rule for {@link Option}.
+ * <p>
+ * The option rule is typically built in one of the following pattern:
+ *
+ * <pre>{@code
+ * // simple rule
+ * OptionRule simpleRule = OptionRule.builder()
+ * .optional(POLL_TIMEOUT, POLL_INTERVAL)
+ * .required(CLIENT_SERVICE_URL)
+ * .build();
+ *
+ * // basic full rule
+ * OptionRule fullRule = OptionRule.builder()
+ * .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
+ * .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
+ * .exclusive(TOPIC_PATTERN, TOPIC)
+ * .conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP,
CURSOR_STARTUP_TIMESTAMP)
+ * .build();
+ *
+ * // complex conditional rule
+ * // moot expression
+ * Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200)
+ * .and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST)
+ * .or(CURSOR_STARTUP_MODE, StartMode.LATEST)))
+ * .or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100)))
+ *
+ * OptionRule complexRule = OptionRule.builder()
+ * .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
+ * .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
+ * .exclusive(TOPIC_PATTERN, TOPIC)
+ * .conditional(expression, CURSOR_RESET_MODE)
+ * .build();
+ * }</pre>
+ */
+public class OptionRule {
+
+ /**
+ * Optional options with default value.
+ *
+ * <p> This options will not be validated.
+ * <p> This is used by the web-UI to show what options are available.
+ */
+ private final Set<Option<?>> optionalOptions;
+
+ /**
+ * Required options with no default value.
+ *
+ * <p> Verify that the option is valid through the defined rules.
+ */
+ private final Set<RequiredOption> requiredOptions;
+
+ OptionRule(Set<Option<?>> optionalOptions, Set<RequiredOption>
requiredOptions) {
+ this.optionalOptions = optionalOptions;
+ this.requiredOptions = requiredOptions;
+ }
+
+ public Set<Option<?>> getOptionalOptions() {
+ return optionalOptions;
+ }
+
+ public Set<RequiredOption> getRequiredOptions() {
+ return requiredOptions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OptionRule)) {
+ return false;
+ }
+ OptionRule that = (OptionRule) o;
+ return Objects.equals(optionalOptions, that.optionalOptions)
+ && Objects.equals(requiredOptions, that.requiredOptions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(optionalOptions, requiredOptions);
+ }
+
+ public static OptionRule.Builder builder() {
+ return new OptionRule.Builder();
+ }
+
+ /**
+ * Builder for {@link OptionRule}.
+ */
+ public static class Builder {
+ private final Set<Option<?>> optionalOptions = new HashSet<>();
+ private final Set<RequiredOption> requiredOptions = new HashSet<>();
+
+ private Builder() {
+ }
+
+ /**
+ * Optional options with default value.
+ *
+ * <p> This options will not be validated.
+ * <p> This is used by the web-UI to show what options are available.
+ */
+ public Builder optional(Option<?>... options) {
+ for (Option<?> option : options) {
+ if (option.defaultValue() == null) {
+ throw new
OptionValidationException(String.format("Optional option '%s' should have
default value.", option.key()));
+ }
+ }
+ this.optionalOptions.addAll(Arrays.asList(options));
+ return this;
+ }
+
+ /**
+ * Absolutely required options without any constraints.
+ */
+ public Builder required(Option<?>... options) {
+ for (Option<?> option : options) {
+ verifyRequiredOptionDefaultValue(option);
+
this.requiredOptions.add(RequiredOption.AbsolutelyRequiredOption.of(option));
+ }
+ return this;
+ }
+
+ /**
+ * Exclusive options, only one of the options needs to be configured.
+ */
+ public Builder exclusive(Option<?>... options) {
+ if (options.length <= 1) {
+ throw new OptionValidationException("The number of exclusive
options must be greater than 1.");
+ }
+ for (Option<?> option : options) {
+ verifyRequiredOptionDefaultValue(option);
+ }
+
this.requiredOptions.add(RequiredOption.ExclusiveRequiredOptions.of(options));
+ return this;
+ }
+
+ /**
+ * Conditional options, These options are required if the {@link
Option} == expectValue.
+ */
+ public <T> Builder conditional(Option<T> option, T expectValue,
Option<?>... requiredOptions) {
+ return conditional(Condition.of(option, expectValue),
requiredOptions);
+ }
+
+ /**
+ * Conditional options, These options are required if the {@link
Condition} evaluates to true.
+ */
+ public Builder conditional(Condition<?> condition, Option<?>...
requiredOptions) {
+ return conditional(Expression.of(condition), requiredOptions);
+ }
+
+ /**
+ * Conditional options, These options are required if the {@link
Expression} evaluates to true.
+ */
+ public Builder conditional(Expression expression, Option<?>...
requiredOptions) {
+ for (Option<?> o : requiredOptions) {
+ verifyRequiredOptionDefaultValue(o);
+ }
+
this.requiredOptions.add(RequiredOption.ConditionalRequiredOptions.of(expression,
new HashSet<>(Arrays.asList(requiredOptions))));
+ return this;
+ }
+
+ public OptionRule build() {
+ return new OptionRule(optionalOptions, requiredOptions);
+ }
+
+ private void verifyRequiredOptionDefaultValue(Option<?> option) {
+ if (option.defaultValue() != null) {
+ throw new OptionValidationException(String.format("Required
option '%s' should have no default value.", option.key()));
+ }
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
similarity index 61%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
index 098d3d291..a5928625a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.configuration.util;
/**
- * todo: use PluginIdentifier.
- * This is the SPI interface.
+ * Exception for all errors occurring during option validation phase.
*/
-public interface Factory {
+public class OptionValidationException extends RuntimeException {
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case
word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version
should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+ public OptionValidationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public OptionValidationException(String message) {
+ super(message);
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
new file mode 100644
index 000000000..692e056f4
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+public interface RequiredOption {
+ class ExclusiveRequiredOptions implements RequiredOption {
+ private final Set<Option<?>> exclusiveOptions;
+
+ ExclusiveRequiredOptions(Set<Option<?>> exclusiveOptions) {
+ this.exclusiveOptions = exclusiveOptions;
+ }
+
+ public static ExclusiveRequiredOptions of(Option<?>...
exclusiveOptions) {
+ return new ExclusiveRequiredOptions(new
HashSet<>(Arrays.asList(exclusiveOptions)));
+ }
+
+ public Set<Option<?>> getExclusiveOptions() {
+ return exclusiveOptions;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ExclusiveRequiredOptions)) {
+ return false;
+ }
+ ExclusiveRequiredOptions that = (ExclusiveRequiredOptions) obj;
+ return Objects.equals(this.exclusiveOptions,
that.exclusiveOptions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(exclusiveOptions);
+ }
+
+ static String getOptionKeys(Set<Option<?>> options) {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for (Option<?> option : options) {
+ if (i > 0) {
+ builder.append(", ");
+ }
+ builder.append("'")
+ .append(option.key())
+ .append("'");
+ i++;
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Exclusive required options: %s",
getOptionKeys(exclusiveOptions));
+ }
+ }
+
+ class AbsolutelyRequiredOption<T> implements RequiredOption {
+ private final Option<T> requiredOption;
+
+ AbsolutelyRequiredOption(Option<T> requiredOption) {
+ this.requiredOption = requiredOption;
+ }
+
+ public static <T> AbsolutelyRequiredOption<T> of(Option<T>
requiredOption) {
+ return new AbsolutelyRequiredOption<>(requiredOption);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof AbsolutelyRequiredOption)) {
+ return false;
+ }
+ AbsolutelyRequiredOption<?> that = (AbsolutelyRequiredOption<?>)
obj;
+ return Objects.equals(this.requiredOption, that.requiredOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.requiredOption.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Absolutely required option: '%s'",
requiredOption.key());
+ }
+ }
+
+ class ConditionalRequiredOptions implements RequiredOption {
+ private final Expression expression;
+ private final Set<Option<?>> requiredOption;
+
+ ConditionalRequiredOptions(Expression expression, Set<Option<?>>
requiredOption) {
+ this.expression = expression;
+ this.requiredOption = requiredOption;
+ }
+
+ public static ConditionalRequiredOptions of(Expression expression,
Set<Option<?>> requiredOption) {
+ return new ConditionalRequiredOptions(expression, requiredOption);
+ }
+
+ public static ConditionalRequiredOptions of(Condition<?> condition,
Set<Option<?>> requiredOption) {
+ return new ConditionalRequiredOptions(Expression.of(condition),
requiredOption);
+ }
+
+ public Expression getExpression() {
+ return expression;
+ }
+
+ public Set<Option<?>> getRequiredOption() {
+ return requiredOption;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ConditionalRequiredOptions)) {
+ return false;
+ }
+ ConditionalRequiredOptions that = (ConditionalRequiredOptions) obj;
+ return Objects.equals(this.expression, that.expression) &&
Objects.equals(this.requiredOption, that.requiredOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.requiredOption.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Condition expression: %s, Required options:
%s", expression, ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 098d3d291..827d3f340 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.api.table.factory;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
/**
* todo: use PluginIdentifier.
* This is the SPI interface.
@@ -31,4 +33,12 @@ public interface Factory {
* using "-" (e.g. {@code elasticsearch-7}).
*/
String factoryIdentifier();
+
+ /**
+ * Returns the rule for options.
+ *
+ * <p>1. Used to verify whether the parameters configured by the user
conform to the rules of the options;
+ * <p>2. Used for Web-UI to prompt user to configure option value;
+ */
+ OptionRule optionRule();
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
new file mode 100644
index 000000000..c3dc57c42
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class OptionTest {
+ public static final Option<Integer> TEST_NUM = Options.key("option.num")
+ .intType()
+ .defaultValue(100)
+ .withDescription("test int option");
+
+ public static final Option<TestMode> TEST_MODE = Options.key("option.mode")
+ .enumType(TestMode.class)
+ .defaultValue(TestMode.LATEST)
+ .withDescription("test enum option");
+
+ public enum TestMode {
+ EARLIEST,
+ LATEST,
+ TIMESTAMP,
+ }
+
+ @Test
+ public void testEquals() {
+ Assertions.assertEquals(TEST_NUM,
Options.key("option.num").intType().defaultValue(100));
+ Assertions.assertEquals(TEST_MODE,
Options.key("option.mode").enumType(TestMode.class).defaultValue(TestMode.LATEST));
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
new file mode 100644
index 000000000..62c45a01b
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings({
+ "MagicNumber", "checkstyle:StaticVariableName"
+})
+public class ReadableConfigTest {
+ private static final String CONFIG_PATH = "/conf/option-test.conf";
+ private static ReadonlyConfig config;
+ private static Map<String, String> map;
+
+ @BeforeAll
+ public static void prepare() throws URISyntaxException {
+ Config rawConfig = ConfigFactory
+
.parseFile(Paths.get(ReadableConfigTest.class.getResource(CONFIG_PATH).toURI()).toFile())
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
+ ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ config =
ReadonlyConfig.fromConfig(rawConfig.getConfigList("source").get(0));
+ map = new HashMap<>();
+ map.put("inner.path", "mac");
+ map.put("inner.name", "ashulin");
+ map.put("inner.map", "{\"fantasy\":\"final\"}");
+ map.put("type", "source");
+ map.put("patch.note", "hollow");
+ map.put("name", "saitou");
+ }
+
+ @Test
+ public void testBooleanOption() {
+ Assertions.assertEquals(true,
config.get(Options.key("option.bool").booleanType().noDefaultValue()));
+ Assertions.assertEquals(false,
config.get(Options.key("option.bool-str").booleanType().noDefaultValue()));
+ Assertions.assertEquals(true,
config.get(Options.key("option.int-str").booleanType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").booleanType().noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.string").booleanType().noDefaultValue()));
+ }
+
+ @Test
+ public void testIntOption() {
+ Assertions.assertEquals(2147483647,
config.get(Options.key("option.int").intType().noDefaultValue()));
+ Assertions.assertEquals(100,
config.get(Options.key("option.int-str").intType().noDefaultValue()));
+ Assertions.assertEquals(2147483647,
config.get(Options.key("option.not-exist").intType().defaultValue(2147483647)));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").intType().noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.long").intType().noDefaultValue()));
+ }
+
+ @Test
+ public void testLongOption() {
+ Assertions.assertEquals(21474836470L,
config.get(Options.key("option.long").longType().noDefaultValue()));
+ Assertions.assertEquals(21474836470L,
config.get(Options.key("option.long-str").longType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").longType().noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.bool").intType().noDefaultValue()));
+ }
+
+ @Test
+ public void testFloatOption() {
+ Assertions.assertEquals(3.3333F,
config.get(Options.key("option.float").floatType().noDefaultValue()));
+ Assertions.assertEquals(21474836470F,
config.get(Options.key("option.long-str").floatType().noDefaultValue()));
+ Assertions.assertEquals(3.1415F,
config.get(Options.key("option.float-str").floatType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").floatType().noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.bool-str").floatType().noDefaultValue()));
+ }
+
+ @Test
+ public void testDoubleOption() {
+ Assertions.assertEquals(3.1415926535897932384626433832795028841971D,
config.get(Options.key("option.double").doubleType().noDefaultValue()));
+ Assertions.assertEquals(3.1415926535897932384626433832795028841971D,
config.get(Options.key("option.double-str").doubleType().noDefaultValue()));
+ Assertions.assertEquals(21474836470D,
config.get(Options.key("option.long-str").doubleType().noDefaultValue()));
+ Assertions.assertEquals(3.1415D,
config.get(Options.key("option.float-str").doubleType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").doubleType().noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.bool-str").doubleType().noDefaultValue()));
+ }
+
+ @Test
+ public void testStringOption() {
+ Assertions.assertEquals("Hello, Apache SeaTunnel",
config.get(Options.key("option.string").stringType().noDefaultValue()));
+ // 'option.double' is not represented as a string and is expected to
lose precision
+
Assertions.assertNotEquals("3.1415926535897932384626433832795028841971",
config.get(Options.key("option.double").stringType().noDefaultValue()));
+ Assertions.assertEquals("3.1415926535897932384626433832795028841971",
config.get(Options.key("option.double-str").stringType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").stringType().noDefaultValue()));
+ }
+
+ @Test
+ public void testEnumOption() {
+ Assertions.assertEquals(OptionTest.TestMode.LATEST,
config.get(Options.key("option.enum").enumType(OptionTest.TestMode.class).noDefaultValue()));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.string").enumType(OptionTest.TestMode.class).noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").enumType(OptionTest.TestMode.class).noDefaultValue()));
+ }
+
+ @Test
+ public void testBasicMapOption() {
+ Assertions.assertEquals(map,
config.get(Options.key("option.map").mapType().noDefaultValue()));
+ Map<String, String> newMap = new HashMap<>();
+ newMap.put("fantasy", "final");
+ Assertions.assertEquals(newMap,
config.get(Options.key("option.map.inner.map").mapType().noDefaultValue()));
+
Assertions.assertTrue(StringUtils.isNotBlank(config.get(Options.key("option").stringType().noDefaultValue())));
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
config.get(Options.key("option.string").mapType().noDefaultValue()));
+
Assertions.assertNull(config.get(Options.key("option.not-exist").enumType(OptionTest.TestMode.class).noDefaultValue()));
+ }
+
+ @Test
+ public void testBasicListOption() {
+ List<String> list = new ArrayList<>();
+ list.add("Hello");
+ list.add("Apache SeaTunnel");
+ Assertions.assertEquals(list,
config.get(Options.key("option.list-json").listType().noDefaultValue()));
+ list = new ArrayList<>();
+ list.add("final");
+ list.add("fantasy");
+ list.add("VII");
+ Assertions.assertEquals(list,
config.get(Options.key("option.list").listType().noDefaultValue()));
+ }
+
+ @Test
+ public void testComplexTypeOption() {
+ List<Map<String, List<Map<String, String>>>> complexType =
config.get(Options.key("option.complex-type").type(new
TypeReference<List<Map<String, List<Map<String, String>>>>>() {
+ }).noDefaultValue());
+ Assertions.assertEquals(1, complexType.size());
+ Assertions.assertEquals(2, complexType.get(0).size());
+ complexType.get(0).values().forEach(value -> {
+ Assertions.assertEquals(1, value.size());
+ Assertions.assertEquals(map, value.get(0));
+ });
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
new file mode 100644
index 000000000..105b83078
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.OptionTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class ConditionTest {
+ private static final Condition<OptionTest.TestMode> TEST_CONDITION =
Condition.of(TEST_MODE, OptionTest.TestMode.EARLIEST)
+ .or(TEST_MODE, OptionTest.TestMode.LATEST)
+ .and(TEST_NUM, 1000);
+
+ @Test
+ public void testToString() {
+ Assertions.assertEquals("('option.mode' == EARLIEST || 'option.mode'
== LATEST) && 'option.num' == 1000", TEST_CONDITION.toString());
+ }
+
+ @Test
+ public void testGetCount() {
+ Assertions.assertEquals(3, TEST_CONDITION.getCount());
+ }
+
+ @Test
+ public void testGetTailCondition() {
+ Assertions.assertEquals(Condition.of(TEST_NUM, 1000),
TEST_CONDITION.getTailCondition());
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
new file mode 100644
index 000000000..5d4416b47
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.OptionTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class ExpressionTest {
+ @Test
+ public void testToString() {
+ Expression expression = Expression.of(TEST_NUM, 200)
+ .and(Expression.of(Condition.of(TEST_MODE,
OptionTest.TestMode.EARLIEST)
+ .or(TEST_MODE, OptionTest.TestMode.LATEST)))
+ .or(Expression.of(Condition.of(TEST_NUM, 100)));
+ Assertions.assertEquals("('option.num' == 200 && ('option.mode' ==
EARLIEST || 'option.mode' == LATEST)) || 'option.num' == 100",
expression.toString());
+ }
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
new file mode 100644
index 000000000..5aca2bef7
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.OptionTest;
+import org.apache.seatunnel.api.configuration.Options;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class OptionRuleTest {
+ public static final Option<Long> TEST_TIMESTAMP =
Options.key("option.timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription("test long timestamp");
+
+ public static final Option<String> TEST_TOPIC_PATTERN =
Options.key("option.topic-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("test string type");
+
+ public static final Option<List<String>> TEST_TOPIC =
Options.key("option.topic")
+ .listType()
+ .noDefaultValue()
+ .withDescription("test list string type");
+
+ public static final Option<List<Integer>> TEST_PORTS =
Options.key("option.ports")
+ .type(new TypeReference<List<Integer>>() {
+ })
+ .noDefaultValue()
+ .withDescription("test list int type");
+
+ @Test
+ public void testBuildSuccess() {
+ OptionRule rule = OptionRule.builder()
+ .optional(TEST_NUM, TEST_MODE)
+ .required(TEST_PORTS)
+ .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+ .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP,
TEST_TIMESTAMP)
+ .build();
+ Assertions.assertNotNull(rule);
+ }
+
+ @Test
+ public void testOptionalException() {
+ Assertions.assertThrows(OptionValidationException.class,
+ () -> OptionRule.builder().optional(TEST_NUM, TEST_MODE,
TEST_PORTS).build(),
+ "Optional option 'option.ports' should have default value.");
+ }
+
+ @Test
+ public void testRequiredException() {
+ Assertions.assertThrows(OptionValidationException.class,
+ () -> OptionRule.builder().required(TEST_NUM, TEST_MODE,
TEST_PORTS).build(),
+ "Required option 'option.num' should have no default value.");
+ }
+
+ @Test
+ public void testExclusiveException() {
+ Assertions.assertThrows(OptionValidationException.class,
+ () -> OptionRule.builder().exclusive(TEST_TOPIC_PATTERN,
TEST_TOPIC, TEST_MODE, TEST_PORTS).build(),
+ "Required option 'option.mode' should have no default value.");
+ Assertions.assertThrows(OptionValidationException.class,
+ () -> OptionRule.builder().exclusive(TEST_TOPIC_PATTERN).build(),
+ "The number of exclusive options must be greater than 1.");
+ }
+
+ @Test
+ public void testConditionalException() {
+ Assertions.assertThrows(OptionValidationException.class,
+ () -> OptionRule.builder().conditional(TEST_MODE,
OptionTest.TestMode.TIMESTAMP, TEST_NUM).build(),
+ "Required option 'option.num' should have no default value.");
+ }
+
+ @Test
+ public void testEquals() {
+ OptionRule rule1 = OptionRule.builder()
+ .optional(TEST_NUM, TEST_MODE)
+ .required(TEST_PORTS)
+ .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+ .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP,
TEST_TIMESTAMP)
+ .build();
+ OptionRule rule2 = OptionRule.builder()
+ .optional(TEST_NUM)
+ .optional(TEST_MODE)
+ .required(TEST_PORTS)
+ .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+ .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP,
TEST_TIMESTAMP)
+ .build();
+ Assertions.assertEquals(rule1, rule2);
+ }
+}
diff --git a/seatunnel-api/src/test/resources/conf/option-test.conf
b/seatunnel-api/src/test/resources/conf/option-test.conf
new file mode 100644
index 000000000..dc3b6b6de
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/option-test.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+source {
+ FakeSource {
+ option {
+ bool = true
+ bool-str = "false"
+ int = 2147483647
+ int-str = "100"
+ float = 3.3333
+ float-str = "3.1415"
+ double = 3.1415926535897932384626433832795028841971
+ double-str = "3.1415926535897932384626433832795028841971"
+ map {
+ inner {
+ path = "mac"
+ name = "ashulin"
+ # The nested Map(Map<Map<?,?>>) type supports only JSON
+ map = """{"fantasy":"final"}"""
+ }
+ type = "source"
+ patch.note = "hollow"
+ }
+ map.name = "saitou"
+ }
+ option.long = 21474836470
+ option.long-str = "21474836470"
+ option.string = "Hello, Apache SeaTunnel"
+ option.enum = "LATEST"
+ option.list-json = """["Hello", "Apache SeaTunnel"]"""
+ option.list = ["final", "fantasy", "VII"]
+ option.complex-type = [{
+ inner {
+ list = [{
+ inner {
+ path = "mac"
+ name = "ashulin"
+ map = """{"fantasy":"final"}"""
+ }
+ type = "source"
+ patch.note = "hollow"
+ name = "saitou"
+ }]
+ }
+ inner.list-2 = [{
+ inner {
+ path = "mac"
+ name = "ashulin"
+ map = """{"fantasy":"final"}"""
+ }
+ type = "source"
+ patch.note = "hollow"
+ name = "saitou"
+ }]
+ }]
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+}
+
+sink {
+ File {
+ path = "file:///tmp/hive/warehouse/test2"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format = "text"
+ sink_columns = ["name","age"]
+ }
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index f2693cd70..6d80f5ad1 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -289,6 +289,7 @@ jackson-databind-2.12.6.jar
jackson-dataformat-cbor-2.12.3.jar
jackson-dataformat-cbor-2.8.10.jar
jackson-dataformat-cbor-2.8.11.jar
+jackson-dataformat-properties-2.12.6.jar
jackson-dataformat-smile-2.10.5.jar
jackson-dataformat-smile-2.8.10.jar
jackson-dataformat-smile-2.8.11.jar