This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 19d8c442f [api] improve config options (#2179)
19d8c442f is described below

commit 19d8c442fcf6894780d202ca5220bd1dbe77fd27
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Jul 27 10:48:32 2022 +0800

    [api] improve config options (#2179)
    
    * [api] improve config options
    
    * [api] option test & new config
    
    * [api] option check style
    
    * [api] support complex type option
    
    * [api] deprecated synchronized & simplified equlas
    
    A read-only configuration don't require the 'synchronized' keyword.
    hashmap initialcapacity: https://cloud.tencent.com/developer/article/1913508
    
    * [api] suppress magic number warning
---
 seatunnel-api/pom.xml                              |   9 +
 .../apache/seatunnel/api/configuration/Option.java |  93 ++++++++++
 .../seatunnel/api/configuration/Options.java       | 190 ++++++++++++++++++++
 .../api/configuration/ReadonlyConfig.java          | 132 ++++++++++++++
 .../api/configuration/util/Condition.java          | 142 +++++++++++++++
 .../api/configuration/util/ConfigUtil.java         | 176 ++++++++++++++++++
 .../api/configuration/util/Expression.java         | 123 +++++++++++++
 .../api/configuration/util/OptionRule.java         | 200 +++++++++++++++++++++
 .../util/OptionValidationException.java}           |  22 ++-
 .../api/configuration/util/RequiredOption.java     | 162 +++++++++++++++++
 .../seatunnel/api/table/factory/Factory.java       |  10 ++
 .../seatunnel/api/configuration/OptionTest.java    |  46 +++++
 .../api/configuration/ReadableConfigTest.java      | 158 ++++++++++++++++
 .../api/configuration/util/ConditionTest.java      |  48 +++++
 .../api/configuration/util/ExpressionTest.java     |  38 ++++
 .../api/configuration/util/OptionRuleTest.java     | 114 ++++++++++++
 .../src/test/resources/conf/option-test.conf       |  98 ++++++++++
 tools/dependencies/known-dependencies.txt          |   1 +
 18 files changed, 1750 insertions(+), 12 deletions(-)

diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index 7b6829a0f..058e240cf 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -38,5 +38,14 @@
             <artifactId>seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-properties</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
new file mode 100644
index 000000000..cfb10412e
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.Objects;
+
+public class Option<T> {
+    /**
+     * The current key for that config option.
+     */
+    private final String key;
+
+    /**
+     * Type of the value that this Option describes.
+     */
+    private final TypeReference<T> typeReference;
+
+    /**
+     * The default value for this option.
+     */
+    private final T defaultValue;
+
+    /**
+     * The description for this option.
+     */
+    String description = "";
+
+    Option(String key, TypeReference<T> typeReference, T defaultValue) {
+        this.key = key;
+        this.typeReference = typeReference;
+        this.defaultValue = defaultValue;
+    }
+
+    public String key() {
+        return key;
+    }
+
+    public TypeReference<T> typeReference() {
+        return typeReference;
+    }
+
+    public T defaultValue() {
+        return defaultValue;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public Option<T> withDescription(String description) {
+        this.description = description;
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Option)) {
+            return false;
+        }
+        Option<?> that = (Option<?>) obj;
+        return Objects.equals(this.key, that.key) && 
Objects.equals(this.defaultValue, that.defaultValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.key, this.defaultValue);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Key: '%s', default: %s", key, defaultValue);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
new file mode 100644
index 000000000..b879da863
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+public class Options {
+
+    /**
+     * Starts building a new {@link Option}.
+     *
+     * @param key The key for the config option.
+     * @return The builder for the config option with the given key.
+     */
+    public static OptionBuilder key(String key) {
+        checkArgument(StringUtils.isNotBlank(key), "Option's key not be 
null.");
+        return new OptionBuilder(key);
+    }
+
+    /**
+     * The option builder is used to create a {@link Option}. It is 
instantiated via {@link
+     * Options#key(String)}.
+     */
+    public static final class OptionBuilder {
+        private final String key;
+
+        /**
+         * Creates a new OptionBuilder.
+         *
+         * @param key The key for the config option
+         */
+        OptionBuilder(String key) {
+            this.key = key;
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Boolean} 
type.
+         */
+        public TypedOptionBuilder<Boolean> booleanType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Boolean>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Integer} 
type.
+         */
+        public TypedOptionBuilder<Integer> intType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Integer>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Long} type.
+         */
+        public TypedOptionBuilder<Long> longType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Long>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Float} 
type.
+         */
+        public TypedOptionBuilder<Float> floatType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Float>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Double} 
type.
+         */
+        public TypedOptionBuilder<Double> doubleType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Double>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link String} 
type.
+         */
+        public TypedOptionBuilder<String> stringType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<String>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Duration} 
type.
+         */
+        public TypedOptionBuilder<Duration> durationType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Duration>() 
{
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be of {@link Enum} type.
+         *
+         * @param enumClass Concrete type of the expected enum.
+         */
+        public <T extends Enum<T>> TypedOptionBuilder<T> enumType(Class<T> 
enumClass) {
+            return new TypedOptionBuilder<>(key, new TypeReference<T>() {
+                @Override
+                public Type getType() {
+                    return enumClass;
+                }
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be a set of properties, 
which can be
+         * represented as {@code Map<String, String>}.
+         */
+        public TypedOptionBuilder<Map<String, String>> mapType() {
+            return new TypedOptionBuilder<>(key, new TypeReference<Map<String, 
String>>() {
+            });
+        }
+
+        /**
+         * Defines that the value of the option should be a list of 
properties, which can be
+         * represented as {@code List<String>}.
+         */
+        public TypedOptionBuilder<List<String>> listType() {
+            return new TypedOptionBuilder<>(key, new 
TypeReference<List<String>>() {
+            });
+        }
+
+        /**
+         * The value of the definition option should be represented as T.
+         *
+         * @param typeReference complex type reference
+         */
+        public <T> TypedOptionBuilder<T> type(TypeReference<T> typeReference) {
+            return new TypedOptionBuilder<T>(key, typeReference);
+        }
+    }
+
+    /**
+     * Builder for {@link Option} with a defined atomic type.
+     *
+     * @param <T> atomic type of the option
+     */
+    public static class TypedOptionBuilder<T> {
+        private final String key;
+        private final TypeReference<T> typeReference;
+
+        TypedOptionBuilder(String key, TypeReference<T> typeReference) {
+            this.key = key;
+            this.typeReference = typeReference;
+        }
+
+        /**
+         * Creates a Option with the given default value.
+         *
+         * @param value The default value for the config option
+         * @return The config option with the default value.
+         */
+        public Option<T> defaultValue(T value) {
+            return new Option<>(key, typeReference, value);
+        }
+
+        /**
+         * Creates a Option without a default value.
+         *
+         * @return The config option without a default value.
+         */
+        public Option<T> noDefaultValue() {
+            return new Option<>(key, typeReference, null);
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
new file mode 100644
index 000000000..beb9d08a9
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import static 
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertToJsonString;
+import static 
org.apache.seatunnel.api.configuration.util.ConfigUtil.convertValue;
+import static 
org.apache.seatunnel.api.configuration.util.ConfigUtil.flatteningMap;
+import static org.apache.seatunnel.api.configuration.util.ConfigUtil.treeMap;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class ReadonlyConfig {
+
+    private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
+
+    /**
+     * Stores the concrete key/value pairs of this configuration object.
+     */
+    protected final Map<String, Object> confData;
+
+    private ReadonlyConfig(Map<String, Object> confData) {
+        this.confData = confData;
+    }
+
+    public static ReadonlyConfig fromMap(Map<String, Object> map) {
+        return new ReadonlyConfig(treeMap(map));
+    }
+
+    public static ReadonlyConfig fromConfig(Config config) {
+        try {
+            return fromMap(JACKSON_MAPPER.readValue(
+                config.root().render(ConfigRenderOptions.concise()),
+                new TypeReference<Map<String, Object>>() {
+                }));
+        } catch (JsonProcessingException e) {
+            throw new IllegalArgumentException("Json parsing exception.", e);
+        }
+    }
+
+    public <T> T get(Option<T> option) {
+        return getOptional(option).orElseGet(option::defaultValue);
+    }
+
+    @SuppressWarnings("MagicNumber")
+    public Map<String, String> toMap() {
+        if (confData.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Map<String, Object> flatteningMap = flatteningMap(confData);
+        Map<String, String> result = new HashMap<>((flatteningMap.size() << 2) 
/ 3 + 1);
+        for (Map.Entry<String, Object> entry : flatteningMap.entrySet()) {
+            result.put(entry.getKey(), convertToJsonString(entry.getValue()));
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> Optional<T> getOptional(Option<T> option) {
+        if (option == null) {
+            throw new NullPointerException("Option not be null.");
+        }
+        String[] keys = option.key().split("\\.");
+        Map<String, Object> data = this.confData;
+        Object value = null;
+        for (int i = 0; i < keys.length; i++) {
+            value = data.get(keys[i]);
+            if (i < keys.length - 1) {
+                if (!((value instanceof Map))) {
+                    return Optional.empty();
+                } else {
+                    data = (Map<String, Object>) value;
+                }
+            }
+        }
+        if (value == null) {
+            return Optional.empty();
+        }
+        return Optional.of(convertValue(value, option.typeReference()));
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 0;
+        for (String s : this.confData.keySet()) {
+            hash ^= s.hashCode();
+        }
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof ReadonlyConfig)) {
+            return false;
+        }
+        Map<String, Object> otherConf = ((ReadonlyConfig) obj).confData;
+        return this.confData.equals(otherConf);
+    }
+
+    @Override
+    public String toString() {
+        return convertToJsonString(this.confData);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
new file mode 100644
index 000000000..74c08c1c7
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Condition.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Objects;
+
+public class Condition<T> {
+    private final Option<T> option;
+    private final T expectValue;
+    private Boolean and = null;
+    private Condition<?> next = null;
+
+    Condition(Option<T> option, T expectValue) {
+        this.option = option;
+        this.expectValue = expectValue;
+    }
+
+    public static <T> Condition<T> of(Option<T> option, T expectValue) {
+        return new Condition<>(option, expectValue);
+    }
+
+    public <E> Condition<T> and(Option<E> option, E expectValue) {
+        return and(of(option, expectValue));
+    }
+
+    public <E> Condition<T> or(Option<E> option, E expectValue) {
+        return or(of(option, expectValue));
+    }
+
+    public Condition<T> and(Condition<?> next) {
+        addCondition(true, next);
+        return this;
+    }
+
+    public Condition<T> or(Condition<?> next) {
+        addCondition(false, next);
+        return this;
+    }
+
+    private void addCondition(boolean and, Condition<?> next) {
+        Condition<?> tail = getTailCondition();
+        tail.and = and;
+        tail.next = next;
+    }
+
+    protected int getCount() {
+        int i = 1;
+        Condition<?> cur = this;
+        while (cur.hasNext()) {
+            i++;
+            cur = cur.next;
+        }
+        return i;
+    }
+
+    Condition<?> getTailCondition() {
+        return hasNext() ? this.next.getTailCondition() : this;
+    }
+
+    public boolean hasNext() {
+        return this.next != null;
+    }
+
+    public Condition<?> getNext() {
+        return this.next;
+    }
+
+    public Option<T> getOption() {
+        return option;
+    }
+
+    public T getExpectValue() {
+        return expectValue;
+    }
+
+    public Boolean and() {
+        return this.and;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Condition)) {
+            return false;
+        }
+        Condition<?> that = (Condition<?>) obj;
+        return Objects.equals(this.option, that.option)
+            && Objects.equals(this.expectValue, that.expectValue)
+            && Objects.equals(this.and, that.and)
+            && Objects.equals(this.next, that.next);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.option, this.expectValue, this.and, 
this.next);
+    }
+
+    @Override
+    public String toString() {
+        Condition<?> cur = this;
+        StringBuilder builder = new StringBuilder();
+        boolean bracket = false;
+        do {
+            builder.append("'")
+                .append(cur.option.key())
+                // TODO: support another condition
+                .append("' == ")
+                .append(cur.expectValue);
+            if (bracket) {
+                builder = new StringBuilder(String.format("(%s)", builder));
+                bracket = false;
+            }
+            if (cur.hasNext()) {
+                if (cur.next.hasNext() && !cur.and.equals(cur.next.and)) {
+                    bracket = true;
+                }
+                builder.append(cur.and ? " && " : " || ");
+            }
+            cur = cur.next;
+        } while (cur != null);
+        return builder.toString();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
new file mode 100644
index 000000000..998e58bd7
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ConfigUtil {
+    private static final JavaPropsMapper PROPERTIES_MAPPER = new 
JavaPropsMapper();
+    private static final ObjectMapper JACKSON_MAPPER = new ObjectMapper();
+
+    /**
+     * <pre>
+     * poll.timeout = 1000
+     *                      ==>>  poll : {timeout = 1000, interval = 500}
+     * poll.interval = 500
+     * </pre>
+     */
+    public static Map<String, Object> treeMap(Object rawMap) {
+        try {
+            return 
PROPERTIES_MAPPER.readValue(PROPERTIES_MAPPER.writeValueAsString(rawMap), new 
TypeReference<Map<String, Object>>() {
+            });
+        } catch (JsonProcessingException e) {
+            throw new IllegalArgumentException("Json parsing exception.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    static Object flatteningMap(Object rawValue, Map<String, Object> newMap, 
List<String> keys, boolean nestedMap) {
+        if (rawValue == null) {
+            return null;
+        }
+        if (!(rawValue instanceof List) && !(rawValue instanceof Map)) {
+            if (newMap == null) {
+                return rawValue;
+            }
+            newMap.put(String.join(".", keys), rawValue);
+            return newMap;
+        }
+
+        if (rawValue instanceof List) {
+            List<Object> rawList = (List<Object>) rawValue;
+            for (int i = 0; i < rawList.size(); i++) {
+                rawList.set(i, flatteningMap(rawList.get(i), null, null, 
false));
+            }
+            if (newMap != null) {
+                newMap.put(String.join(".", keys), rawList);
+                return newMap;
+            }
+            return rawList;
+        } else {
+            Map<String, Object> rawMap = (Map<String, Object>) rawValue;
+            if (!nestedMap) {
+                keys = new ArrayList<>();
+                newMap = new HashMap<>(rawMap.size());
+            }
+            for (Map.Entry<String, Object> entry : rawMap.entrySet()) {
+                keys.add(entry.getKey());
+                flatteningMap(entry.getValue(), newMap, keys, true);
+                keys.remove(keys.size() - 1);
+            }
+            return newMap;
+        }
+    }
+
+    /**
+     * <pre>
+     *                                                  poll.timeout = 1000
+     * poll : {timeout = 1000, interval = 500}  ==>>
+     *                                                  poll.interval = 500
+     * </pre>
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> flatteningMap(Map<String, Object> 
treeMap) {
+        return (Map<String, Object>) flatteningMapWithObject(treeMap);
+    }
+
+    static Object flatteningMapWithObject(Object rawValue) {
+        return flatteningMap(rawValue, null, null, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T convertValue(Object rawValue, TypeReference<T> 
typeReference) {
+        rawValue = flatteningMapWithObject(rawValue);
+        if (typeReference.getType() instanceof Class) {
+            // simple type
+            Class<T> clazz = (Class<T>) typeReference.getType();
+            if (clazz.equals(rawValue.getClass())) {
+                return (T) rawValue;
+            }
+            try {
+                return convertValue(rawValue, clazz);
+            } catch (IllegalArgumentException e) {
+                // Continue with Jackson parsing
+            }
+        }
+        try {
+            // complex type && untreated type
+            return JACKSON_MAPPER.readValue(convertToJsonString(rawValue), 
typeReference);
+        } catch (JsonProcessingException e) {
+            throw new IllegalArgumentException(String.format("Json parsing 
exception, value '%s', and expected type '%s'", rawValue, 
typeReference.getType().getTypeName()), e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T> T convertValue(Object rawValue, Class<T> clazz) {
+        if (Boolean.class.equals(clazz)) {
+            return (T) convertToBoolean(rawValue);
+        } else if (clazz.isEnum()) {
+            return (T) convertToEnum(rawValue, (Class<? extends Enum<?>>) 
clazz);
+        } else if (String.class.equals(clazz)) {
+            return (T) convertToJsonString(rawValue);
+        }
+        throw new IllegalArgumentException("Unsupported type: " + clazz);
+    }
+
+    static Boolean convertToBoolean(Object o) {
+        switch (o.toString().toUpperCase()) {
+            case "TRUE":
+                return true;
+            case "FALSE":
+                return false;
+            default:
+                throw new IllegalArgumentException(
+                    String.format(
+                        "Unrecognized option for boolean: %s. Expected either 
true or false(case insensitive)",
+                        o));
+        }
+    }
+
+    static <E extends Enum<?>> E convertToEnum(Object o, Class<E> clazz) {
+        return Arrays.stream(clazz.getEnumConstants())
+            .filter(e -> e.toString()
+                .toUpperCase(Locale.ROOT)
+                .equals(o.toString().toUpperCase(Locale.ROOT)))
+            .findAny()
+            .orElseThrow(() -> new IllegalArgumentException(String.format(
+                "Could not parse value for enum %s. Expected one of: [%s]",
+                clazz, Arrays.toString(clazz.getEnumConstants()))));
+    }
+
+    public static String convertToJsonString(Object o) {
+        if (o instanceof String) {
+            return (String) o;
+        }
+        try {
+            return JACKSON_MAPPER.writeValueAsString(o);
+        } catch (JsonProcessingException e) {
+            throw new IllegalArgumentException(String.format("Could not parse 
json, value: %s", o));
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
new file mode 100644
index 000000000..bceb86223
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/Expression.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Objects;
+
+public class Expression {
+    private final Condition<?> condition;
+    private Boolean and = null;
+    private Expression next = null;
+
+    Expression(Condition<?> condition) {
+        this.condition = condition;
+    }
+
+    public static <T> Expression of(Option<T> option, T expectValue) {
+        return new Expression(Condition.of(option, expectValue));
+    }
+
+    public static Expression of(Condition<?> condition) {
+        return new Expression(condition);
+    }
+
+    public Expression and(Expression next) {
+        addExpression(true, next);
+        return this;
+    }
+
+    public Expression or(Expression next) {
+        addExpression(false, next);
+        return this;
+    }
+
+    private void addExpression(boolean and, Expression next) {
+        Expression tail = getTailExpression();
+        tail.and = and;
+        tail.next = next;
+    }
+
+    private Expression getTailExpression() {
+        return hasNext() ? this.next.getTailExpression() : this;
+    }
+
+    public Condition<?> getCondition() {
+        return condition;
+    }
+
+    public boolean hasNext() {
+        return this.next != null;
+    }
+
+    public Expression getNext() {
+        return this.next;
+    }
+
+    public Boolean and() {
+        return this.and;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Expression)) {
+            return false;
+        }
+        Expression that = (Expression) obj;
+        return Objects.equals(this.condition, that.condition)
+            && Objects.equals(this.and, that.and)
+            && Objects.equals(this.next, that.next);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.condition, this.and, this.next);
+    }
+
+    @Override
+    public String toString() {
+        Expression cur = this;
+        StringBuilder builder = new StringBuilder();
+        boolean bracket = false;
+        do {
+            if (cur.condition.getCount() > 1) {
+                builder.append("(")
+                    .append(cur.condition)
+                    .append(")");
+            } else {
+                builder.append(cur.condition);
+            }
+            if (bracket) {
+                builder = new StringBuilder(String.format("(%s)", builder));
+                bracket = false;
+            }
+            if (cur.hasNext()) {
+                if (cur.next.hasNext() && !cur.and.equals(cur.next.and)) {
+                    bracket = true;
+                }
+                builder.append(cur.and ? " && " : " || ");
+            }
+            cur = cur.next;
+        } while (cur != null);
+        return builder.toString();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
new file mode 100644
index 000000000..c830709c2
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Validation rule for {@link Option}.
+ * <p>
+ * The option rule is typically built in one of the following pattern:
+ *
+ * <pre>{@code
+ * // simple rule
+ * OptionRule simpleRule = OptionRule.builder()
+ *     .optional(POLL_TIMEOUT, POLL_INTERVAL)
+ *     .required(CLIENT_SERVICE_URL)
+ *     .build();
+ *
+ * // basic full rule
+ * OptionRule fullRule = OptionRule.builder()
+ *     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
+ *     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
+ *     .exclusive(TOPIC_PATTERN, TOPIC)
+ *     .conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP, 
CURSOR_STARTUP_TIMESTAMP)
+ *     .build();
+ *
+ * // complex conditional rule
+ * // moot expression
+ * Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200)
+ *     .and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST)
+ *         .or(CURSOR_STARTUP_MODE, StartMode.LATEST)))
+ *     .or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100)))
+ *
+ * OptionRule complexRule = OptionRule.builder()
+ *     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
+ *     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
+ *     .exclusive(TOPIC_PATTERN, TOPIC)
+ *     .conditional(expression, CURSOR_RESET_MODE)
+ *     .build();
+ * }</pre>
+ */
+public class OptionRule {
+
+    /**
+     * Optional options with default value.
+     *
+     * <p> This options will not be validated.
+     * <p> This is used by the web-UI to show what options are available.
+     */
+    private final Set<Option<?>> optionalOptions;
+
+    /**
+     * Required options with no default value.
+     *
+     * <p> Verify that the option is valid through the defined rules.
+     */
+    private final Set<RequiredOption> requiredOptions;
+
+    OptionRule(Set<Option<?>> optionalOptions, Set<RequiredOption> 
requiredOptions) {
+        this.optionalOptions = optionalOptions;
+        this.requiredOptions = requiredOptions;
+    }
+
+    public Set<Option<?>> getOptionalOptions() {
+        return optionalOptions;
+    }
+
+    public Set<RequiredOption> getRequiredOptions() {
+        return requiredOptions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof OptionRule)) {
+            return false;
+        }
+        OptionRule that = (OptionRule) o;
+        return Objects.equals(optionalOptions, that.optionalOptions)
+            && Objects.equals(requiredOptions, that.requiredOptions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(optionalOptions, requiredOptions);
+    }
+
+    public static OptionRule.Builder builder() {
+        return new OptionRule.Builder();
+    }
+
+    /**
+     * Builder for {@link OptionRule}.
+     */
+    public static class Builder {
+        private final Set<Option<?>> optionalOptions = new HashSet<>();
+        private final Set<RequiredOption> requiredOptions = new HashSet<>();
+
+        private Builder() {
+        }
+
+        /**
+         * Optional options with default value.
+         *
+         * <p> This options will not be validated.
+         * <p> This is used by the web-UI to show what options are available.
+         */
+        public Builder optional(Option<?>... options) {
+            for (Option<?> option : options) {
+                if (option.defaultValue() == null) {
+                    throw new 
OptionValidationException(String.format("Optional option '%s' should have 
default value.", option.key()));
+                }
+            }
+            this.optionalOptions.addAll(Arrays.asList(options));
+            return this;
+        }
+
+        /**
+         * Absolutely required options without any constraints.
+         */
+        public Builder required(Option<?>... options) {
+            for (Option<?> option : options) {
+                verifyRequiredOptionDefaultValue(option);
+                
this.requiredOptions.add(RequiredOption.AbsolutelyRequiredOption.of(option));
+            }
+            return this;
+        }
+
+        /**
+         * Exclusive options, only one of the options needs to be configured.
+         */
+        public Builder exclusive(Option<?>... options) {
+            if (options.length <= 1) {
+                throw new OptionValidationException("The number of exclusive 
options must be greater than 1.");
+            }
+            for (Option<?> option : options) {
+                verifyRequiredOptionDefaultValue(option);
+            }
+            
this.requiredOptions.add(RequiredOption.ExclusiveRequiredOptions.of(options));
+            return this;
+        }
+
+        /**
+         * Conditional options, These options are required if the {@link 
Option} == expectValue.
+         */
+        public <T> Builder conditional(Option<T> option, T expectValue, 
Option<?>... requiredOptions) {
+            return conditional(Condition.of(option, expectValue), 
requiredOptions);
+        }
+
+        /**
+         * Conditional options, These options are required if the {@link 
Condition} evaluates to true.
+         */
+        public Builder conditional(Condition<?> condition, Option<?>... 
requiredOptions) {
+            return conditional(Expression.of(condition), requiredOptions);
+        }
+
+        /**
+         * Conditional options, These options are required if the {@link 
Expression} evaluates to true.
+         */
+        public Builder conditional(Expression expression, Option<?>... 
requiredOptions) {
+            for (Option<?> o : requiredOptions) {
+                verifyRequiredOptionDefaultValue(o);
+            }
+            
this.requiredOptions.add(RequiredOption.ConditionalRequiredOptions.of(expression,
 new HashSet<>(Arrays.asList(requiredOptions))));
+            return this;
+        }
+
+        public OptionRule build() {
+            return new OptionRule(optionalOptions, requiredOptions);
+        }
+
+        private void verifyRequiredOptionDefaultValue(Option<?> option) {
+            if (option.defaultValue() != null) {
+                throw new OptionValidationException(String.format("Required 
option '%s' should have no default value.", option.key()));
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
similarity index 61%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
index 098d3d291..a5928625a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.configuration.util;
 
 /**
- * todo: use PluginIdentifier.
- * This is the SPI interface.
+ * Exception for all errors occurring during option validation phase.
  */
-public interface Factory {
+public class OptionValidationException extends RuntimeException {
 
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case 
word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version 
should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+    public OptionValidationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public OptionValidationException(String message) {
+        super(message);
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
new file mode 100644
index 000000000..692e056f4
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+public interface RequiredOption {
+    class ExclusiveRequiredOptions implements RequiredOption {
+        private final Set<Option<?>> exclusiveOptions;
+
+        ExclusiveRequiredOptions(Set<Option<?>> exclusiveOptions) {
+            this.exclusiveOptions = exclusiveOptions;
+        }
+
+        public static ExclusiveRequiredOptions of(Option<?>... 
exclusiveOptions) {
+            return new ExclusiveRequiredOptions(new 
HashSet<>(Arrays.asList(exclusiveOptions)));
+        }
+
+        public Set<Option<?>> getExclusiveOptions() {
+            return exclusiveOptions;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof ExclusiveRequiredOptions)) {
+                return false;
+            }
+            ExclusiveRequiredOptions that = (ExclusiveRequiredOptions) obj;
+            return Objects.equals(this.exclusiveOptions, 
that.exclusiveOptions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(exclusiveOptions);
+        }
+
+        static String getOptionKeys(Set<Option<?>> options) {
+            StringBuilder builder = new StringBuilder();
+            int i = 0;
+            for (Option<?> option : options) {
+                if (i > 0) {
+                    builder.append(", ");
+                }
+                builder.append("'")
+                    .append(option.key())
+                    .append("'");
+                i++;
+            }
+            return builder.toString();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("Exclusive required options: %s", 
getOptionKeys(exclusiveOptions));
+        }
+    }
+
+    class AbsolutelyRequiredOption<T> implements RequiredOption {
+        private final Option<T> requiredOption;
+
+        AbsolutelyRequiredOption(Option<T> requiredOption) {
+            this.requiredOption = requiredOption;
+        }
+
+        public static <T> AbsolutelyRequiredOption<T> of(Option<T> 
requiredOption) {
+            return new AbsolutelyRequiredOption<>(requiredOption);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof AbsolutelyRequiredOption)) {
+                return false;
+            }
+            AbsolutelyRequiredOption<?> that = (AbsolutelyRequiredOption<?>) 
obj;
+            return Objects.equals(this.requiredOption, that.requiredOption);
+        }
+
+        @Override
+        public int hashCode() {
+            return this.requiredOption.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("Absolutely required option: '%s'", 
requiredOption.key());
+        }
+    }
+
+    class ConditionalRequiredOptions implements RequiredOption {
+        private final Expression expression;
+        private final Set<Option<?>> requiredOption;
+
+        ConditionalRequiredOptions(Expression expression, Set<Option<?>> 
requiredOption) {
+            this.expression = expression;
+            this.requiredOption = requiredOption;
+        }
+
+        public static ConditionalRequiredOptions of(Expression expression, 
Set<Option<?>> requiredOption) {
+            return new ConditionalRequiredOptions(expression, requiredOption);
+        }
+
+        public static ConditionalRequiredOptions of(Condition<?> condition, 
Set<Option<?>> requiredOption) {
+            return new ConditionalRequiredOptions(Expression.of(condition), 
requiredOption);
+        }
+
+        public Expression getExpression() {
+            return expression;
+        }
+
+        public Set<Option<?>> getRequiredOption() {
+            return requiredOption;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof ConditionalRequiredOptions)) {
+                return false;
+            }
+            ConditionalRequiredOptions that = (ConditionalRequiredOptions) obj;
+            return Objects.equals(this.expression, that.expression) && 
Objects.equals(this.requiredOption, that.requiredOption);
+        }
+
+        @Override
+        public int hashCode() {
+            return this.requiredOption.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("Condition expression: %s, Required options: 
%s", expression, ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 098d3d291..827d3f340 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.api.table.factory;
 
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
 /**
  * todo: use PluginIdentifier.
  * This is the SPI interface.
@@ -31,4 +33,12 @@ public interface Factory {
      * using "-" (e.g. {@code elasticsearch-7}).
      */
     String factoryIdentifier();
+
+    /**
+     * Returns the rule for options.
+     *
+     * <p>1. Used to verify whether the parameters configured by the user 
conform to the rules of the options;
+     * <p>2. Used for Web-UI to prompt user to configure option value;
+     */
+    OptionRule optionRule();
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
new file mode 100644
index 000000000..c3dc57c42
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class OptionTest {
+    public static final Option<Integer> TEST_NUM = Options.key("option.num")
+        .intType()
+        .defaultValue(100)
+        .withDescription("test int option");
+
+    public static final Option<TestMode> TEST_MODE = Options.key("option.mode")
+        .enumType(TestMode.class)
+        .defaultValue(TestMode.LATEST)
+        .withDescription("test enum option");
+
+    public enum TestMode {
+        EARLIEST,
+        LATEST,
+        TIMESTAMP,
+    }
+
+    @Test
+    public void testEquals() {
+        Assertions.assertEquals(TEST_NUM, 
Options.key("option.num").intType().defaultValue(100));
+        Assertions.assertEquals(TEST_MODE, 
Options.key("option.mode").enumType(TestMode.class).defaultValue(TestMode.LATEST));
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
new file mode 100644
index 000000000..62c45a01b
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings({
+    "MagicNumber", "checkstyle:StaticVariableName"
+})
+public class ReadableConfigTest {
+    private static final String CONFIG_PATH = "/conf/option-test.conf";
+    private static ReadonlyConfig config;
+    private static Map<String, String> map;
+
+    @BeforeAll
+    public static void prepare() throws URISyntaxException {
+        Config rawConfig = ConfigFactory
+            
.parseFile(Paths.get(ReadableConfigTest.class.getResource(CONFIG_PATH).toURI()).toFile())
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(),
+                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        config = 
ReadonlyConfig.fromConfig(rawConfig.getConfigList("source").get(0));
+        map = new HashMap<>();
+        map.put("inner.path", "mac");
+        map.put("inner.name", "ashulin");
+        map.put("inner.map", "{\"fantasy\":\"final\"}");
+        map.put("type", "source");
+        map.put("patch.note", "hollow");
+        map.put("name", "saitou");
+    }
+
+    @Test
+    public void testBooleanOption() {
+        Assertions.assertEquals(true, 
config.get(Options.key("option.bool").booleanType().noDefaultValue()));
+        Assertions.assertEquals(false, 
config.get(Options.key("option.bool-str").booleanType().noDefaultValue()));
+        Assertions.assertEquals(true, 
config.get(Options.key("option.int-str").booleanType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").booleanType().noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.string").booleanType().noDefaultValue()));
+    }
+
+    @Test
+    public void testIntOption() {
+        Assertions.assertEquals(2147483647, 
config.get(Options.key("option.int").intType().noDefaultValue()));
+        Assertions.assertEquals(100, 
config.get(Options.key("option.int-str").intType().noDefaultValue()));
+        Assertions.assertEquals(2147483647, 
config.get(Options.key("option.not-exist").intType().defaultValue(2147483647)));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").intType().noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.long").intType().noDefaultValue()));
+    }
+
+    @Test
+    public void testLongOption() {
+        Assertions.assertEquals(21474836470L, 
config.get(Options.key("option.long").longType().noDefaultValue()));
+        Assertions.assertEquals(21474836470L, 
config.get(Options.key("option.long-str").longType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").longType().noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.bool").intType().noDefaultValue()));
+    }
+
+    @Test
+    public void testFloatOption() {
+        Assertions.assertEquals(3.3333F, 
config.get(Options.key("option.float").floatType().noDefaultValue()));
+        Assertions.assertEquals(21474836470F, 
config.get(Options.key("option.long-str").floatType().noDefaultValue()));
+        Assertions.assertEquals(3.1415F, 
config.get(Options.key("option.float-str").floatType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").floatType().noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.bool-str").floatType().noDefaultValue()));
+    }
+
+    @Test
+    public void testDoubleOption() {
+        Assertions.assertEquals(3.1415926535897932384626433832795028841971D, 
config.get(Options.key("option.double").doubleType().noDefaultValue()));
+        Assertions.assertEquals(3.1415926535897932384626433832795028841971D, 
config.get(Options.key("option.double-str").doubleType().noDefaultValue()));
+        Assertions.assertEquals(21474836470D, 
config.get(Options.key("option.long-str").doubleType().noDefaultValue()));
+        Assertions.assertEquals(3.1415D, 
config.get(Options.key("option.float-str").doubleType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").doubleType().noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.bool-str").doubleType().noDefaultValue()));
+    }
+
+    @Test
+    public void testStringOption() {
+        Assertions.assertEquals("Hello, Apache SeaTunnel", 
config.get(Options.key("option.string").stringType().noDefaultValue()));
+        // 'option.double' is not represented as a string and is expected to 
lose precision
+        
Assertions.assertNotEquals("3.1415926535897932384626433832795028841971", 
config.get(Options.key("option.double").stringType().noDefaultValue()));
+        Assertions.assertEquals("3.1415926535897932384626433832795028841971", 
config.get(Options.key("option.double-str").stringType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").stringType().noDefaultValue()));
+    }
+
+    @Test
+    public void testEnumOption() {
+        Assertions.assertEquals(OptionTest.TestMode.LATEST, 
config.get(Options.key("option.enum").enumType(OptionTest.TestMode.class).noDefaultValue()));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.string").enumType(OptionTest.TestMode.class).noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").enumType(OptionTest.TestMode.class).noDefaultValue()));
+    }
+
+    @Test
+    public void testBasicMapOption() {
+        Assertions.assertEquals(map, 
config.get(Options.key("option.map").mapType().noDefaultValue()));
+        Map<String, String> newMap = new HashMap<>();
+        newMap.put("fantasy", "final");
+        Assertions.assertEquals(newMap, 
config.get(Options.key("option.map.inner.map").mapType().noDefaultValue()));
+        
Assertions.assertTrue(StringUtils.isNotBlank(config.get(Options.key("option").stringType().noDefaultValue())));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
config.get(Options.key("option.string").mapType().noDefaultValue()));
+        
Assertions.assertNull(config.get(Options.key("option.not-exist").enumType(OptionTest.TestMode.class).noDefaultValue()));
+    }
+
+    @Test
+    public void testBasicListOption() {
+        List<String> list = new ArrayList<>();
+        list.add("Hello");
+        list.add("Apache SeaTunnel");
+        Assertions.assertEquals(list, 
config.get(Options.key("option.list-json").listType().noDefaultValue()));
+        list = new ArrayList<>();
+        list.add("final");
+        list.add("fantasy");
+        list.add("VII");
+        Assertions.assertEquals(list, 
config.get(Options.key("option.list").listType().noDefaultValue()));
+    }
+
+    @Test
+    public void testComplexTypeOption() {
+        List<Map<String, List<Map<String, String>>>> complexType = 
config.get(Options.key("option.complex-type").type(new 
TypeReference<List<Map<String, List<Map<String, String>>>>>() {
+        }).noDefaultValue());
+        Assertions.assertEquals(1, complexType.size());
+        Assertions.assertEquals(2, complexType.get(0).size());
+        complexType.get(0).values().forEach(value -> {
+            Assertions.assertEquals(1, value.size());
+            Assertions.assertEquals(map, value.get(0));
+        });
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
new file mode 100644
index 000000000..105b83078
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.OptionTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class ConditionTest {
+    private static final Condition<OptionTest.TestMode> TEST_CONDITION = 
Condition.of(TEST_MODE, OptionTest.TestMode.EARLIEST)
+        .or(TEST_MODE, OptionTest.TestMode.LATEST)
+        .and(TEST_NUM, 1000);
+
+    @Test
+    public void testToString() {
+        Assertions.assertEquals("('option.mode' == EARLIEST || 'option.mode' 
== LATEST) && 'option.num' == 1000", TEST_CONDITION.toString());
+    }
+
+    @Test
+    public void testGetCount() {
+        Assertions.assertEquals(3, TEST_CONDITION.getCount());
+    }
+
+    @Test
+    public void testGetTailCondition() {
+        Assertions.assertEquals(Condition.of(TEST_NUM, 1000), 
TEST_CONDITION.getTailCondition());
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
new file mode 100644
index 000000000..5d4416b47
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.OptionTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("MagicNumber")
+public class ExpressionTest {
+    @Test
+    public void testToString() {
+        Expression expression = Expression.of(TEST_NUM, 200)
+            .and(Expression.of(Condition.of(TEST_MODE, 
OptionTest.TestMode.EARLIEST)
+                .or(TEST_MODE, OptionTest.TestMode.LATEST)))
+            .or(Expression.of(Condition.of(TEST_NUM, 100)));
+        Assertions.assertEquals("('option.num' == 200 && ('option.mode' == 
EARLIEST || 'option.mode' == LATEST)) || 'option.num' == 100", 
expression.toString());
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
new file mode 100644
index 000000000..5aca2bef7
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_NUM;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.OptionTest;
+import org.apache.seatunnel.api.configuration.Options;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class OptionRuleTest {
+    public static final Option<Long> TEST_TIMESTAMP = 
Options.key("option.timestamp")
+        .longType()
+        .noDefaultValue()
+        .withDescription("test long timestamp");
+
+    public static final Option<String> TEST_TOPIC_PATTERN = 
Options.key("option.topic-pattern")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("test string type");
+
+    public static final Option<List<String>> TEST_TOPIC = 
Options.key("option.topic")
+        .listType()
+        .noDefaultValue()
+        .withDescription("test list string type");
+
+    public static final Option<List<Integer>> TEST_PORTS = 
Options.key("option.ports")
+        .type(new TypeReference<List<Integer>>() {
+        })
+        .noDefaultValue()
+        .withDescription("test list int type");
+
+    @Test
+    public void testBuildSuccess() {
+        OptionRule rule = OptionRule.builder()
+            .optional(TEST_NUM, TEST_MODE)
+            .required(TEST_PORTS)
+            .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+            .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP, 
TEST_TIMESTAMP)
+            .build();
+        Assertions.assertNotNull(rule);
+    }
+
+    @Test
+    public void testOptionalException() {
+        Assertions.assertThrows(OptionValidationException.class,
+            () -> OptionRule.builder().optional(TEST_NUM, TEST_MODE, 
TEST_PORTS).build(),
+            "Optional option 'option.ports' should have default value.");
+    }
+
+    @Test
+    public void testRequiredException() {
+        Assertions.assertThrows(OptionValidationException.class,
+            () -> OptionRule.builder().required(TEST_NUM, TEST_MODE, 
TEST_PORTS).build(),
+            "Required option 'option.num' should have no default value.");
+    }
+
+    @Test
+    public void testExclusiveException() {
+        Assertions.assertThrows(OptionValidationException.class,
+            () -> OptionRule.builder().exclusive(TEST_TOPIC_PATTERN, 
TEST_TOPIC, TEST_MODE, TEST_PORTS).build(),
+            "Required option 'option.mode' should have no default value.");
+        Assertions.assertThrows(OptionValidationException.class,
+            () -> OptionRule.builder().exclusive(TEST_TOPIC_PATTERN).build(),
+            "The number of exclusive options must be greater than 1.");
+    }
+
+    @Test
+    public void testConditionalException() {
+        Assertions.assertThrows(OptionValidationException.class,
+            () -> OptionRule.builder().conditional(TEST_MODE, 
OptionTest.TestMode.TIMESTAMP, TEST_NUM).build(),
+            "Required option 'option.num' should have no default value.");
+    }
+
+    @Test
+    public void testEquals() {
+        OptionRule rule1 = OptionRule.builder()
+            .optional(TEST_NUM, TEST_MODE)
+            .required(TEST_PORTS)
+            .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+            .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP, 
TEST_TIMESTAMP)
+            .build();
+        OptionRule rule2 = OptionRule.builder()
+            .optional(TEST_NUM)
+            .optional(TEST_MODE)
+            .required(TEST_PORTS)
+            .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+            .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP, 
TEST_TIMESTAMP)
+            .build();
+        Assertions.assertEquals(rule1, rule2);
+    }
+}
diff --git a/seatunnel-api/src/test/resources/conf/option-test.conf 
b/seatunnel-api/src/test/resources/conf/option-test.conf
new file mode 100644
index 000000000..dc3b6b6de
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/option-test.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "STREAMING"
+    execution.checkpoint.interval = 5000
+}
+
+source {
+    FakeSource {
+        option {
+            bool = true
+            bool-str = "false"
+            int = 2147483647
+            int-str = "100"
+            float = 3.3333
+            float-str = "3.1415"
+            double = 3.1415926535897932384626433832795028841971
+            double-str = "3.1415926535897932384626433832795028841971"
+            map {
+                inner {
+                    path = "mac"
+                    name = "ashulin"
+                    # The nested Map(Map<Map<?,?>>) type supports only JSON
+                    map = """{"fantasy":"final"}"""
+                }
+                type = "source"
+                patch.note = "hollow"
+            }
+            map.name = "saitou"
+        }
+        option.long = 21474836470
+        option.long-str = "21474836470"
+        option.string = "Hello, Apache SeaTunnel"
+        option.enum = "LATEST"
+        option.list-json = """["Hello", "Apache SeaTunnel"]"""
+        option.list = ["final", "fantasy", "VII"]
+        option.complex-type = [{
+            inner {
+                list = [{
+                    inner {
+                        path = "mac"
+                        name = "ashulin"
+                        map = """{"fantasy":"final"}"""
+                    }
+                    type = "source"
+                    patch.note = "hollow"
+                    name = "saitou"
+                }]
+            }
+            inner.list-2 = [{
+                inner {
+                    path = "mac"
+                    name = "ashulin"
+                    map = """{"fantasy":"final"}"""
+                }
+                type = "source"
+                patch.note = "hollow"
+                name = "saitou"
+            }]
+        }]
+    }
+}
+
+transform {
+    sql {
+        sql = "select name,age from fake"
+    }
+}
+
+sink {
+    File {
+        path = "file:///tmp/hive/warehouse/test2"
+        field_delimiter = "\t"
+        row_delimiter = "\n"
+        partition_by = ["age"]
+        partition_dir_expression = "${k0}=${v0}"
+        is_partition_field_write_in_file = true
+        file_name_expression = "${transactionId}_${now}"
+        file_format = "text"
+        sink_columns = ["name","age"]
+    }
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index f2693cd70..6d80f5ad1 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -289,6 +289,7 @@ jackson-databind-2.12.6.jar
 jackson-dataformat-cbor-2.12.3.jar
 jackson-dataformat-cbor-2.8.10.jar
 jackson-dataformat-cbor-2.8.11.jar
+jackson-dataformat-properties-2.12.6.jar
 jackson-dataformat-smile-2.10.5.jar
 jackson-dataformat-smile-2.8.10.jar
 jackson-dataformat-smile-2.8.11.jar

Reply via email to