This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9e4a12838 [Improve][Connector-V2] Add Clickhouse and Assert 
Source/Sink Factory (#3306)
9e4a12838 is described below

commit 9e4a128381f3df57a73161d2149982fc0997a422
Author: Hisoka <[email protected]>
AuthorDate: Tue Nov 8 15:27:26 2022 +0800

    [Improve][Connector-V2] Add Clickhouse and Assert Source/Sink Factory 
(#3306)
    
    * [Config] [Connector-V2] Add Clickhouse Source/Sink factory to support 
OptionRule
    
    * [Config] [Connector] Add clickhouse and assert factory
    
    * [Config] [Connector] Change filed name
    
    * [Config] [Connector] adjust ClickhouseFileSinkFactory
    
    * [Config] [Connector] Add Option Annotation
    
    * [Config] [Connector] Add Option Annotation
    
    * [Config] [Connector] fix ci problems
    
    * [Connector-V2] [Config] Change Option to OptionMark
    
    * [Connector-V2] [Config] Change Factory
---
 docs/en/connector-v2/source/Clickhouse.md          |   9 +-
 .../apache/seatunnel/api/configuration/Option.java |   2 +-
 .../seatunnel/api/configuration/Options.java       |  16 ++-
 .../api/configuration/util/OptionMark.java         |  40 ++++++++
 .../api/configuration/util/OptionRule.java         |  12 +--
 .../api/configuration/util/OptionUtil.java         |  60 ++++++++++++
 .../api/configuration/util/RequiredOption.java     |  41 ++++++++
 .../api/configuration/util/OptionRuleTest.java     |   2 +-
 .../api/configuration/util/OptionUtilTest.java     |  68 +++++++++++++
 .../api/configuration/util/TestOptionConfig.java   |  67 +++++++++++++
 .../configuration/util/TestOptionConfigEnum.java   |  23 +++++
 .../seatunnel/assertion/rule/AssertRuleParser.java |  24 +++--
 .../seatunnel/assertion/sink/AssertConfig.java     |  43 ++++++++
 .../seatunnel/assertion/sink/AssertSink.java       |  15 +--
 .../assertion/sink/AssertSinkFactory.java          |  40 ++++++++
 .../seatunnel/assertion/sink/FieldRule.java        |  38 +++++++
 .../seatunnel/assertion/sink/RowRule.java          |  34 +++++++
 .../connectors/seatunnel/assertion/sink/Rules.java |  35 +++++++
 .../clickhouse/config/ClickhouseConfig.java        | 109 +++++++++++++++++++++
 .../seatunnel/clickhouse/config/Config.java        |  96 ------------------
 .../clickhouse/config/NodePassConfig.java          |  33 +++++++
 .../clickhouse/sink/client/ClickhouseSink.java     |  92 ++++++++---------
 .../sink/client/ClickhouseSinkFactory.java         |  50 ++++++++++
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  60 ++++++------
 .../sink/file/ClickhouseFileSinkFactory.java       |  49 +++++++++
 .../clickhouse/source/ClickhouseSource.java        |  20 ++--
 .../clickhouse/source/ClickhouseSourceFactory.java |  43 ++++++++
 27 files changed, 906 insertions(+), 215 deletions(-)

diff --git a/docs/en/connector-v2/source/Clickhouse.md 
b/docs/en/connector-v2/source/Clickhouse.md
index 1f048f1cd..8df3d7c3d 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -27,13 +27,12 @@ Reading data from Clickhouse can also be done using JDBC
 ## Options
 
 | name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
+|----------------|--------|----------|---------------|
 | host           | string | yes      | -             |
 | database       | string | yes      | -             |
 | sql            | string | yes      | -             |
 | username       | string | yes      | -             |
 | password       | string | yes      | -             |
-| schema         | config | No       | -             |
 | common-options |        | no       | -             |
 
 ### host [string]
@@ -56,12 +55,6 @@ The query sql used to search data though Clickhouse server
 
 `ClickHouse` user password
 
-### schema [Config]
-
-#### fields [Config]
-
-the schema fields of upstream data
-
 ### common options 
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
index cfb10412e..cfd46673a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
@@ -42,7 +42,7 @@ public class Option<T> {
      */
     String description = "";
 
-    Option(String key, TypeReference<T> typeReference, T defaultValue) {
+    public Option(String key, TypeReference<T> typeReference, T defaultValue) {
         this.key = key;
         this.typeReference = typeReference;
         this.defaultValue = defaultValue;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index b879da863..f67d88a7c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -144,13 +144,27 @@ public class Options {
             });
         }
 
+        /**
+         * Defines that the value of the option should be a list of 
properties, which can be
+         * represented as {@code List<T>}.
+         */
+        public <T> TypedOptionBuilder<List<T>> listType(Class<T> option) {
+            return new TypedOptionBuilder<>(key, new TypeReference<List<T>>() {
+            });
+        }
+
+        public <T> TypedOptionBuilder<T> objectType(Class<T> option) {
+            return new TypedOptionBuilder<>(key, new TypeReference<T>() {
+            });
+        }
+
         /**
          * The value of the definition option should be represented as T.
          *
          * @param typeReference complex type reference
          */
         public <T> TypedOptionBuilder<T> type(TypeReference<T> typeReference) {
-            return new TypedOptionBuilder<T>(key, typeReference);
+            return new TypedOptionBuilder<>(key, typeReference);
         }
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
new file mode 100644
index 000000000..4823128d6
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target(ElementType.FIELD)
+public @interface OptionMark {
+
+    /**
+     * The key of the option, if not configured, we will default convert 
`lowerCamelCase` to `under_score_case` and provide it to users
+     */
+    String name() default "";
+
+    /**
+     * The description of the option
+     */
+    String description() default "";
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
index c830709c2..861859b23 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
@@ -122,17 +122,12 @@ public class OptionRule {
         }
 
         /**
-         * Optional options with default value.
+         * Optional options
          *
          * <p> This options will not be validated.
          * <p> This is used by the web-UI to show what options are available.
          */
         public Builder optional(Option<?>... options) {
-            for (Option<?> option : options) {
-                if (option.defaultValue() == null) {
-                    throw new 
OptionValidationException(String.format("Optional option '%s' should have 
default value.", option.key()));
-                }
-            }
             this.optionalOptions.addAll(Arrays.asList(options));
             return this;
         }
@@ -187,6 +182,11 @@ public class OptionRule {
             return this;
         }
 
+        public Builder bundledRequired(Option<?>... requiredOptions) {
+            
this.requiredOptions.add(RequiredOption.BundledRequiredOptions.of(new 
HashSet<>(Arrays.asList(requiredOptions))));
+            return this;
+        }
+
         public OptionRule build() {
             return new OptionRule(optionalOptions, requiredOptions);
         }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
new file mode 100644
index 000000000..7c6835efc
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OptionUtil {
+
+    public static List<Option<?>> getOptions(Class<?> clazz) throws 
InstantiationException, IllegalAccessException {
+        Field[] fields = clazz.getDeclaredFields();
+        List<Option<?>> options = new ArrayList<>();
+        Object object = clazz.newInstance();
+        for (Field field : fields) {
+            field.setAccessible(true);
+            OptionMark option = field.getAnnotation(OptionMark.class);
+            if (option != null) {
+                options.add(new 
Option<>(!StringUtils.isNotBlank(option.name()) ? 
formatUnderScoreCase(field.getName()) : option.name(),
+                    new TypeReference<Object>() {
+                        @Override
+                        public Type getType() {
+                            return field.getType();
+                        }
+                    }, 
field.get(object)).withDescription(option.description()));
+            }
+        }
+        return options;
+    }
+
+    private static String formatUnderScoreCase(String camel) {
+        StringBuilder underScore = new 
StringBuilder(String.valueOf(Character.toLowerCase(camel.charAt(0))));
+        for (int i = 1; i < camel.length(); i++) {
+            char c = camel.charAt(i);
+            underScore.append(Character.isLowerCase(c) ? c : "_" + 
Character.toLowerCase(c));
+        }
+        return underScore.toString();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
index 692e056f4..acafaaa99 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
@@ -159,4 +159,45 @@ public interface RequiredOption {
             return String.format("Condition expression: %s, Required options: 
%s", expression, ExclusiveRequiredOptions.getOptionKeys(requiredOption));
         }
     }
+
+    /**
+     * All options must exist or not exist at the same time
+     */
+    class BundledRequiredOptions implements RequiredOption {
+        private final Set<Option<?>> requiredOption;
+
+        BundledRequiredOptions(Set<Option<?>> requiredOption) {
+            this.requiredOption = requiredOption;
+        }
+
+        public static BundledRequiredOptions of(Set<Option<?>> requiredOption) 
{
+            return new BundledRequiredOptions(requiredOption);
+        }
+
+        public Set<Option<?>> getRequiredOption() {
+            return requiredOption;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof BundledRequiredOptions)) {
+                return false;
+            }
+            BundledRequiredOptions that = (BundledRequiredOptions) obj;
+            return Objects.equals(this.requiredOption, that.requiredOption);
+        }
+
+        @Override
+        public int hashCode() {
+            return this.requiredOption.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("Both Required options: %s", 
ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+        }
+    }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
index 5aca2bef7..4535fab3b 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
@@ -66,7 +66,7 @@ public class OptionRuleTest {
     @Test
     public void testOptionalException() {
         Assertions.assertThrows(OptionValidationException.class,
-            () -> OptionRule.builder().optional(TEST_NUM, TEST_MODE, 
TEST_PORTS).build(),
+            () -> OptionRule.builder().required(TEST_NUM, TEST_MODE, 
TEST_PORTS).build(),
             "Optional option 'option.ports' should have default value.");
     }
 
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
new file mode 100644
index 000000000..439e30ab2
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class OptionUtilTest {
+
+    @Test
+    public void test() throws InstantiationException, IllegalAccessException {
+        List<Option<?>> options = 
OptionUtil.getOptions(TestOptionConfig.class);
+        Assertions.assertEquals(options.get(0).key(), "short-value");
+        Assertions.assertEquals(options.get(0).getDescription(), "shortValue");
+        Assertions.assertEquals(options.get(0).typeReference().getType(), 
Short.class);
+
+        Assertions.assertEquals(options.get(1).typeReference().getType(), 
Integer.class);
+        Assertions.assertEquals(options.get(1).key(), "int_value");
+        Assertions.assertEquals(options.get(1).getDescription(), "");
+        Assertions.assertNull(options.get(1).defaultValue());
+
+        Assertions.assertEquals(options.get(2).typeReference().getType(), 
Long.class);
+
+        Assertions.assertEquals(options.get(3).typeReference().getType(), 
Float.class);
+
+        Assertions.assertEquals(options.get(4).typeReference().getType(), 
Double.class);
+
+        Assertions.assertEquals(options.get(5).typeReference().getType(), 
String.class);
+        Assertions.assertEquals(options.get(5).defaultValue(), "default 
string");
+
+        Assertions.assertEquals(options.get(6).typeReference().getType(), 
Boolean.class);
+        Assertions.assertEquals(options.get(6).defaultValue(), true);
+
+        Assertions.assertEquals(options.get(7).typeReference().getType(), 
Byte.class);
+
+        Assertions.assertEquals(options.get(8).typeReference().getType(), 
Character.class);
+        Assertions.assertEquals(options.get(9).typeReference().getType(), 
TestOptionConfigEnum.class);
+        Assertions.assertEquals(options.get(9).defaultValue(), 
TestOptionConfigEnum.KEY2);
+
+        Assertions.assertEquals(options.get(10).typeReference().getType(), 
TestOptionConfig.class);
+
+        Assertions.assertEquals(options.get(11).typeReference().getType(), 
List.class);
+
+        Assertions.assertEquals(options.get(12).typeReference().getType(), 
Map.class);
+
+    }
+
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
new file mode 100644
index 000000000..3fadf0d25
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class TestOptionConfig {
+
+    @OptionMark(name = "short-value", description = "shortValue")
+    private Short shortValue;
+
+    @OptionMark
+    private Integer intValue;
+
+    @OptionMark(description = "longValue")
+    private Long longValue;
+
+    @OptionMark(description = "floatValue")
+    private Float floatValue;
+
+    @OptionMark(description = "doubleValue")
+    private Double doubleValue;
+
+    @OptionMark(description = "stringValue")
+    private String stringValue = "default string";
+
+    @OptionMark(description = "booleanValue")
+    private Boolean booleanValue = true;
+
+    @OptionMark(description = "byteValue")
+    private Byte byteValue;
+
+    @OptionMark(description = "charValue")
+    private Character charValue;
+
+    @OptionMark(description = "enumValue")
+    private TestOptionConfigEnum enumValue = TestOptionConfigEnum.KEY2;
+
+    @OptionMark(description = "objectValue")
+    private TestOptionConfig objectValue;
+
+    @OptionMark(description = "listValue")
+    private List<TestOptionConfig> listValue;
+
+    @OptionMark(description = "mapValue")
+    private Map<String, String> mapValue;
+
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
new file mode 100644
index 000000000..66c8a07c9
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+public enum TestOptionConfigEnum {
+    KEY1,
+    KEY2
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index c69baf03b..dfc646a86 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -17,6 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_VALUE;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_VALUE;
+
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
@@ -39,13 +45,13 @@ public class AssertRuleParser {
         return ruleConfigList.stream()
             .map(config -> {
                 AssertFieldRule fieldRule = new AssertFieldRule();
-                fieldRule.setFieldName(config.getString("field_name"));
-                if (config.hasPath("field_type")) {
-                    
fieldRule.setFieldType(getFieldType(config.getString("field_type")));
+                fieldRule.setFieldName(config.getString(FIELD_NAME));
+                if (config.hasPath(FIELD_TYPE)) {
+                    
fieldRule.setFieldType(getFieldType(config.getString(FIELD_TYPE)));
                 }
 
-                if (config.hasPath("field_value")) {
-                    List<AssertFieldRule.AssertRule> fieldValueRules = 
assembleFieldValueRules(config.getConfigList("field_value"));
+                if (config.hasPath(FIELD_VALUE)) {
+                    List<AssertFieldRule.AssertRule> fieldValueRules = 
assembleFieldValueRules(config.getConfigList(FIELD_VALUE));
                     fieldRule.setFieldRules(fieldValueRules);
                 }
                 return fieldRule;
@@ -57,11 +63,11 @@ public class AssertRuleParser {
         return fieldValueConfigList.stream()
             .map(config -> {
                 AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
-                if (config.hasPath("rule_type")) {
-                    
valueRule.setRuleType(AssertFieldRule.AssertRuleType.valueOf(config.getString("rule_type")));
+                if (config.hasPath(RULE_TYPE)) {
+                    
valueRule.setRuleType(AssertFieldRule.AssertRuleType.valueOf(config.getString(RULE_TYPE)));
                 }
-                if (config.hasPath("rule_value")) {
-                    valueRule.setRuleValue(config.getDouble("rule_value"));
+                if (config.hasPath(RULE_VALUE)) {
+                    valueRule.setRuleValue(config.getDouble(RULE_VALUE));
                 }
                 return valueRule;
             })
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
new file mode 100644
index 000000000..7b9fa76b4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AssertConfig {
+
+    public static final String RULE_TYPE = "rule_type";
+
+    public static final String RULE_VALUE = "rule_value";
+
+    public static final String ROW_RULES = "row_rules";
+
+    public static final String FIELD_NAME = "field_name";
+
+    public static final String FIELD_TYPE = "field_type";
+
+    public static final String FIELD_VALUE = "field_value";
+
+    public static final String FIELD_RULES = "field_rules";
+
+    public static final Option<Rules> RULES = 
Options.key("rules").objectType(Rules.class)
+        .noDefaultValue().withDescription("Rule definition of user's available 
data. Each rule represents one field validation or row num validation.");
+
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 5c611f760..77d682d63 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -39,9 +43,6 @@ import java.util.List;
 
 @AutoService(SeaTunnelSink.class)
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-    private static final String RULES = "rules";
-    private static final String ROW_RULES = "row_rules";
-    private static final String FIELD_RULES = "field_rules";
     private SeaTunnelRowType seaTunnelRowType;
     private List<AssertFieldRule> assertFieldRules;
     private List<AssertFieldRule.AssertRule> assertRowRules;
@@ -63,10 +64,10 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config pluginConfig) {
-        if (!pluginConfig.hasPath(RULES)) {
-            Throwables.propagateIfPossible(new ConfigException.Missing(RULES));
+        if (!pluginConfig.hasPath(RULES.key())) {
+            Throwables.propagateIfPossible(new 
ConfigException.Missing(RULES.key()));
         }
-        Config ruleConfig = pluginConfig.getConfig(RULES);
+        Config ruleConfig = pluginConfig.getConfig(RULES.key());
         List<? extends Config> rowConfigList = null;
         List<? extends Config> configList = null;
         if (ruleConfig.hasPath(ROW_RULES)) {
@@ -79,7 +80,7 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         }
 
         if (CollectionUtils.isEmpty(configList) && 
CollectionUtils.isEmpty(rowConfigList)) {
-            Throwables.propagateIfPossible(new ConfigException.BadValue(RULES, 
"Assert rule config is empty, please add rule config."));
+            Throwables.propagateIfPossible(new 
ConfigException.BadValue(RULES.key(), "Assert rule config is empty, please add 
rule config."));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
new file mode 100644
index 000000000..c3c6ad553
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AssertSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "AssertSink";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(RULES).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
new file mode 100644
index 000000000..ff1eeefd5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class FieldRule {
+
+    @OptionMark(description = "field name")
+    private String fieldName;
+
+    @OptionMark(description = "field type")
+    private String fieldType;
+
+    @OptionMark(description = "A list value rule define the data value 
validation")
+    private List<RowRule> fieldValue;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
new file mode 100644
index 000000000..274ed88ed
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+
+import lombok.Data;
+
+@Data
+public class RowRule {
+
+    @OptionMark(description = "The rule type of the rule")
+    private AssertFieldRule.AssertRuleType ruleType;
+
+    @OptionMark(description = "The value related to rule type")
+    private Double ruleValue;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
new file mode 100644
index 000000000..c51e9fd77
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Rules {
+
+    @OptionMark(description = "row rules for row validation")
+    private List<RowRule> rowRules;
+
+    @OptionMark(description = "field rules for field validation")
+    private List<FieldRule> fieldRules;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
new file mode 100644
index 000000000..62f7bd398
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+public class ClickhouseConfig {
+
+    /**
+     * Bulk size of clickhouse jdbc
+     */
+    public static final Option<Integer> BULK_SIZE = 
Options.key("bulk_size").intType()
+        .defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");
+
+    /**
+     * Clickhouse fields
+     */
+    public static final Option<String> FIELDS = 
Options.key("fields").stringType()
+        .noDefaultValue().withDescription("Clickhouse fields");
+
+    public static final Option<String> SQL = Options.key("sql").stringType()
+        .noDefaultValue().withDescription("Clickhouse sql used to query data");
+
+    /**
+     * Clickhouse server host
+     */
+    public static final Option<String> HOST = Options.key("host").stringType()
+        .noDefaultValue().withDescription("Clickhouse server host");
+
+    /**
+     * Clickhouse table name
+     */
+    public static final Option<String> TABLE = 
Options.key("table").stringType()
+        .noDefaultValue().withDescription("Clickhouse table name");
+
+    /**
+     * Clickhouse database name
+     */
+    public static final Option<String> DATABASE = 
Options.key("database").stringType()
+        .noDefaultValue().withDescription("Clickhouse database name");
+
+    /**
+     * Clickhouse server username
+     */
+    public static final Option<String> USERNAME = 
Options.key("username").stringType()
+        .noDefaultValue().withDescription("Clickhouse server username");
+
+    /**
+     * Clickhouse server password
+     */
+    public static final Option<String> PASSWORD = 
Options.key("password").stringType()
+        .noDefaultValue().withDescription("Clickhouse server password");
+
+    /**
+     * Split mode when table is distributed engine
+     */
+    public static final Option<Boolean> SPLIT_MODE = 
Options.key("split_mode").booleanType()
+        .defaultValue(false).withDescription("Split mode when table is 
distributed engine");
+
+    /**
+     * When split_mode is true, the sharding_key use for split
+     */
+    public static final Option<String> SHARDING_KEY = 
Options.key("sharding_key").stringType()
+        .noDefaultValue().withDescription("When split_mode is true, the 
sharding_key use for split");
+
+    /**
+     * ClickhouseFile sink connector used clickhouse-local program's path
+     */
+    public static final Option<String> CLICKHOUSE_LOCAL_PATH = 
Options.key("clickhouse_local_path").stringType()
+        .noDefaultValue().withDescription("ClickhouseFile sink connector used 
clickhouse-local program's path");
+
+    /**
+     * The method of copy Clickhouse file
+     */
+    public static final Option<ClickhouseFileCopyMethod> COPY_METHOD = 
Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class)
+        .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The 
method of copy Clickhouse file");
+
+    public static final String NODE_ADDRESS = "node_address";
+    /**
+     * The password of Clickhouse server node
+     */
+    public static final Option<List<NodePassConfig>> NODE_PASS = 
Options.key("node_pass").listType(NodePassConfig.class)
+        .noDefaultValue().withDescription("The password of Clickhouse server 
node");
+
+    public static final Option<Map<String, String>> CLICKHOUSE_PREFIX = 
Options.key("clickhouse").mapType()
+        .defaultValue(Collections.emptyMap()).withDescription("Clickhouse 
custom config");
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
deleted file mode 100644
index 6563274ba..000000000
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-public class Config {
-
-    /**
-     * Bulk size of clickhouse jdbc
-     */
-    public static final String BULK_SIZE = "bulk_size";
-
-    /**
-     * Clickhouse fields
-     */
-    public static final String FIELDS = "fields";
-
-    public static final String SQL = "sql";
-
-    /**
-     * Clickhouse server host
-     */
-    public static final String HOST = "host";
-
-    /**
-     * Clickhouse table name
-     */
-    public static final String TABLE = "table";
-
-    /**
-     * Clickhouse database name
-     */
-    public static final String DATABASE = "database";
-
-    /**
-     * Clickhouse server username
-     */
-    public static final String USERNAME = "username";
-
-    /**
-     * Clickhouse server password
-     */
-    public static final String PASSWORD = "password";
-
-    /**
-     * Split mode when table is distributed engine
-     */
-    public static final String SPLIT_MODE = "split_mode";
-
-    /**
-     * When split_mode is true, the sharding_key use for split
-     */
-    public static final String SHARDING_KEY = "sharding_key";
-
-    /**
-     * ClickhouseFile sink connector used clickhouse-local program's path
-     */
-    public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";
-
-    /**
-     * The method of copy Clickhouse file
-     */
-    public static final String COPY_METHOD = "copy_method";
-
-    /**
-     * The size of each batch read temporary data into local file.
-     */
-    public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";
-
-    /**
-     * The password of Clickhouse server node
-     */
-    public static final String NODE_PASS = "node_pass";
-
-    /**
-     * The address of Clickhouse server node
-     */
-    public static final String NODE_ADDRESS = "node_address";
-
-    public static final String CLICKHOUSE_PREFIX = "clickhouse.";
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
new file mode 100644
index 000000000..7340a9176
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+@Data
+public class NodePassConfig {
+
+    @OptionMark(description = "The address of Clickhouse server node")
+    private String nodeAddress;
+
+    @OptionMark(description = "Clickhouse server linux password")
+    private String password;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 3c2c1104d..028fe4a5c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,16 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.BULK_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_PREFIX;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SPLIT_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -76,58 +76,58 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, 
DATABASE, TABLE);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, 
HOST.key(), DATABASE.key(), TABLE.key());
 
-        boolean isCredential = config.hasPath(USERNAME) || 
config.hasPath(PASSWORD);
+        boolean isCredential = config.hasPath(USERNAME.key()) || 
config.hasPath(PASSWORD.key());
 
         if (isCredential) {
-            result = CheckConfigUtil.checkAllExists(config, USERNAME, 
PASSWORD);
+            result = CheckConfigUtil.checkAllExists(config, USERNAME.key(), 
PASSWORD.key());
         }
 
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
         Map<String, Object> defaultConfig = ImmutableMap.<String, 
Object>builder()
-                .put(BULK_SIZE, 20_000)
-                .put(SPLIT_MODE, false)
+            .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
+            .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
                 .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
 
         List<ClickHouseNode> nodes;
         if (!isCredential) {
-            nodes = ClickhouseUtil.createNodes(config.getString(HOST), 
config.getString(DATABASE),
-                    null, null);
+            nodes = ClickhouseUtil.createNodes(config.getString(HOST.key()), 
config.getString(DATABASE.key()),
+                null, null);
         } else {
-            nodes = ClickhouseUtil.createNodes(config.getString(HOST),
-                    config.getString(DATABASE), config.getString(USERNAME), 
config.getString(PASSWORD));
+            nodes = ClickhouseUtil.createNodes(config.getString(HOST.key()),
+                config.getString(DATABASE.key()), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
         }
 
         Properties clickhouseProperties = new Properties();
-        if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
-            TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX, 
false).entrySet().forEach(e -> {
+        if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX.key() + 
".")) {
+            TypesafeConfigUtils.extractSubConfig(config, 
CLICKHOUSE_PREFIX.key() + ".", false).entrySet().forEach(e -> {
                 clickhouseProperties.put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
             });
         }
 
         if (isCredential) {
-            clickhouseProperties.put("user", config.getString(USERNAME));
-            clickhouseProperties.put("password", config.getString(PASSWORD));
+            clickhouseProperties.put("user", config.getString(USERNAME.key()));
+            clickhouseProperties.put("password", 
config.getString(PASSWORD.key()));
         }
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
-        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE));
+        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
         String shardKey = null;
         String shardKeyType = null;
-        if (config.getBoolean(SPLIT_MODE)) {
-            ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE),
-                    config.getString(TABLE));
+        if (config.getBoolean(SPLIT_MODE.key())) {
+            ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE.key()),
+                config.getString(TABLE.key()));
             if (!"Distributed".equals(table.getEngine())) {
                 throw new IllegalArgumentException("split mode only support 
table which engine is " +
-                        "'Distributed' engine at now");
+                    "'Distributed' engine at now");
             }
-            if (config.hasPath(SHARDING_KEY)) {
-                shardKey = config.getString(SHARDING_KEY);
+            if (config.hasPath(SHARDING_KEY.key())) {
+                shardKey = config.getString(SHARDING_KEY.key());
                 shardKeyType = tableSchema.get(shardKey);
             }
         }
@@ -135,36 +135,36 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
 
         if (isCredential) {
             metadata = new ShardMetadata(
-                    shardKey,
-                    shardKeyType,
-                    config.getString(DATABASE),
-                    config.getString(TABLE),
-                    config.getBoolean(SPLIT_MODE),
-                    new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), 
config.getString(PASSWORD));
+                shardKey,
+                shardKeyType,
+                config.getString(DATABASE.key()),
+                config.getString(TABLE.key()),
+                config.getBoolean(SPLIT_MODE.key()),
+                new Shard(1, 1, nodes.get(0)), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
         } else {
             metadata = new ShardMetadata(
-                    shardKey,
-                    shardKeyType,
-                    config.getString(DATABASE),
-                    config.getString(TABLE),
-                    config.getBoolean(SPLIT_MODE),
-                    new Shard(1, 1, nodes.get(0)));
+                shardKey,
+                shardKeyType,
+                config.getString(DATABASE.key()),
+                config.getString(TABLE.key()),
+                config.getBoolean(SPLIT_MODE.key()),
+                new Shard(1, 1, nodes.get(0)));
         }
 
         List<String> fields = new ArrayList<>();
-        if (config.hasPath(FIELDS)) {
-            fields.addAll(config.getStringList(FIELDS));
+        if (config.hasPath(FIELDS.key())) {
+            fields.addAll(config.getStringList(FIELDS.key()));
             // check if the fields exist in schema
             for (String field : fields) {
                 if (!tableSchema.containsKey(field)) {
-                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE));
+                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE.key()));
                 }
             }
         } else {
             fields.addAll(tableSchema.keySet());
         }
         proxy.close();
-        this.option = new ReaderOption(metadata, clickhouseProperties, fields, 
tableSchema, config.getInt(BULK_SIZE));
+        this.option = new ReaderOption(metadata, clickhouseProperties, fields, 
tableSchema, config.getInt(BULK_SIZE.key()));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
new file mode 100644
index 000000000..a804400f9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "ClickhouseSink";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(HOST, DATABASE, TABLE)
+            .optional(CLICKHOUSE_PREFIX, BULK_SIZE, SPLIT_MODE, FIELDS, 
SHARDING_KEY)
+            .bundledRequired(USERNAME, PASSWORD).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 319937ac1..e32ecd2ef 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -17,17 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_LOCAL_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.COPY_METHOD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_PASS;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -73,55 +73,55 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, 
TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, 
HOST.key(), TABLE.key(), DATABASE.key(), USERNAME.key(), PASSWORD.key(), 
CLICKHOUSE_LOCAL_PATH.key());
         if (!checkResult.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
checkResult.getMsg());
         }
         Map<String, Object> defaultConfigs = ImmutableMap.<String, 
Object>builder()
-                .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+                .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
                 .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
-        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(config.getString(HOST),
-                config.getString(DATABASE), config.getString(USERNAME), 
config.getString(PASSWORD));
+        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(config.getString(HOST.key()),
+                config.getString(DATABASE.key()), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
-        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE));
+        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
         String shardKey = null;
         String shardKeyType = null;
-        if (config.hasPath(SHARDING_KEY)) {
-            shardKey = config.getString(SHARDING_KEY);
+        if (config.hasPath(SHARDING_KEY.key())) {
+            shardKey = config.getString(SHARDING_KEY.key());
             shardKeyType = tableSchema.get(shardKey);
         }
         ShardMetadata shardMetadata = new ShardMetadata(
                 shardKey,
                 shardKeyType,
-                config.getString(DATABASE),
-                config.getString(TABLE),
+                config.getString(DATABASE.key()),
+                config.getString(TABLE.key()),
                 false, // we don't need to set splitMode in clickhouse file 
mode.
-                new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), 
config.getString(PASSWORD));
+                new Shard(1, 1, nodes.get(0)), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
         List<String> fields;
-        if (config.hasPath(FIELDS)) {
-            fields = config.getStringList(FIELDS);
+        if (config.hasPath(FIELDS.key())) {
+            fields = config.getStringList(FIELDS.key());
             // check if the fields exist in schema
             for (String field : fields) {
                 if (!tableSchema.containsKey(field)) {
-                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE));
+                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE.key()));
                 }
             }
         } else {
             fields = new ArrayList<>(tableSchema.keySet());
         }
-        Map<String, String> nodeUser = config.getObjectList(NODE_PASS).stream()
+        Map<String, String> nodeUser = 
config.getObjectList(NODE_PASS.key()).stream()
                 .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
-                    configObject -> configObject.toConfig().hasPath(USERNAME) 
? configObject.toConfig().getString(USERNAME) : "root"));
-        Map<String, String> nodePassword = 
config.getObjectList(NODE_PASS).stream()
+                    configObject -> 
configObject.toConfig().hasPath(USERNAME.key()) ? 
configObject.toConfig().getString(USERNAME.key()) : "root"));
+        Map<String, String> nodePassword = 
config.getObjectList(NODE_PASS.key()).stream()
                 .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
-                    configObject -> 
configObject.toConfig().getString(PASSWORD)));
+                    configObject -> 
configObject.toConfig().getString(PASSWORD.key())));
 
         proxy.close();
-        this.readerOption = new FileReaderOption(shardMetadata, tableSchema, 
fields, config.getString(CLICKHOUSE_LOCAL_PATH),
-                ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD)), 
nodeUser, nodePassword);
+        this.readerOption = new FileReaderOption(shardMetadata, tableSchema, 
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
+                
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, 
nodePassword);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
new file mode 100644
index 000000000..c08f39484
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseFileSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "ClickhouseFileSink";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, 
PASSWORD, CLICKHOUSE_LOCAL_PATH)
+            .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 8706fe059..59a24cac9 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -64,19 +64,19 @@ public class ClickhouseSource implements 
SeaTunnelSource<SeaTunnelRow, Clickhous
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, 
DATABASE, SQL, USERNAME, PASSWORD);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, 
HOST.key(), DATABASE.key(), SQL.key(), USERNAME.key(), PASSWORD.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
-        servers = ClickhouseUtil.createNodes(config.getString(HOST), 
config.getString(DATABASE),
-                config.getString(USERNAME), config.getString(PASSWORD));
+        servers = ClickhouseUtil.createNodes(config.getString(HOST.key()), 
config.getString(DATABASE.key()),
+                config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
 
-        sql = config.getString(SQL);
+        sql = config.getString(SQL.key());
         ClickHouseNode currentServer = 
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
         try (ClickHouseClient client = 
ClickHouseClient.newInstance(currentServer.getProtocol());
              ClickHouseResponse response =
                      
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-                             
.query(modifySQLToLimit1(config.getString(SQL))).executeAndWait()) {
+                             
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
 
             int columnSize = response.getColumns().size();
             String[] fieldNames = new String[columnSize];
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
new file mode 100644
index 000000000..0f1c8e285
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "ClickhouseSource";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, 
PASSWORD).build();
+    }
+}

Reply via email to