This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9e4a12838 [Improve][Connector-V2] Add Clickhouse and Assert
Source/Sink Factory (#3306)
9e4a12838 is described below
commit 9e4a128381f3df57a73161d2149982fc0997a422
Author: Hisoka <[email protected]>
AuthorDate: Tue Nov 8 15:27:26 2022 +0800
[Improve][Connector-V2] Add Clickhouse and Assert Source/Sink Factory
(#3306)
* [Config] [Connector-V2] Add Clickhouse Source/Sink factory to support
OptionRule
* [Config] [Connector] Add clickhouse and assert factory
* [Config] [Connector] Change filed name
* [Config] [Connector] adjust ClickhouseFileSinkFactory
* [Config] [Connector] Add Option Annotation
* [Config] [Connector] Add Option Annotation
* [Config] [Connector] fix ci problems
* [Connector-V2] [Config] Change Option to OptionMark
* [Connector-V2] [Config] Change Factory
---
docs/en/connector-v2/source/Clickhouse.md | 9 +-
.../apache/seatunnel/api/configuration/Option.java | 2 +-
.../seatunnel/api/configuration/Options.java | 16 ++-
.../api/configuration/util/OptionMark.java | 40 ++++++++
.../api/configuration/util/OptionRule.java | 12 +--
.../api/configuration/util/OptionUtil.java | 60 ++++++++++++
.../api/configuration/util/RequiredOption.java | 41 ++++++++
.../api/configuration/util/OptionRuleTest.java | 2 +-
.../api/configuration/util/OptionUtilTest.java | 68 +++++++++++++
.../api/configuration/util/TestOptionConfig.java | 67 +++++++++++++
.../configuration/util/TestOptionConfigEnum.java | 23 +++++
.../seatunnel/assertion/rule/AssertRuleParser.java | 24 +++--
.../seatunnel/assertion/sink/AssertConfig.java | 43 ++++++++
.../seatunnel/assertion/sink/AssertSink.java | 15 +--
.../assertion/sink/AssertSinkFactory.java | 40 ++++++++
.../seatunnel/assertion/sink/FieldRule.java | 38 +++++++
.../seatunnel/assertion/sink/RowRule.java | 34 +++++++
.../connectors/seatunnel/assertion/sink/Rules.java | 35 +++++++
.../clickhouse/config/ClickhouseConfig.java | 109 +++++++++++++++++++++
.../seatunnel/clickhouse/config/Config.java | 96 ------------------
.../clickhouse/config/NodePassConfig.java | 33 +++++++
.../clickhouse/sink/client/ClickhouseSink.java | 92 ++++++++---------
.../sink/client/ClickhouseSinkFactory.java | 50 ++++++++++
.../clickhouse/sink/file/ClickhouseFileSink.java | 60 ++++++------
.../sink/file/ClickhouseFileSinkFactory.java | 49 +++++++++
.../clickhouse/source/ClickhouseSource.java | 20 ++--
.../clickhouse/source/ClickhouseSourceFactory.java | 43 ++++++++
27 files changed, 906 insertions(+), 215 deletions(-)
diff --git a/docs/en/connector-v2/source/Clickhouse.md
b/docs/en/connector-v2/source/Clickhouse.md
index 1f048f1cd..8df3d7c3d 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -27,13 +27,12 @@ Reading data from Clickhouse can also be done using JDBC
## Options
| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
+|----------------|--------|----------|---------------|
| host | string | yes | - |
| database | string | yes | - |
| sql | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
-| schema | config | No | - |
| common-options | | no | - |
### host [string]
@@ -56,12 +55,6 @@ The query sql used to search data though Clickhouse server
`ClickHouse` user password
-### schema [Config]
-
-#### fields [Config]
-
-the schema fields of upstream data
-
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
index cfb10412e..cfd46673a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Option.java
@@ -42,7 +42,7 @@ public class Option<T> {
*/
String description = "";
- Option(String key, TypeReference<T> typeReference, T defaultValue) {
+ public Option(String key, TypeReference<T> typeReference, T defaultValue) {
this.key = key;
this.typeReference = typeReference;
this.defaultValue = defaultValue;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index b879da863..f67d88a7c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -144,13 +144,27 @@ public class Options {
});
}
+ /**
+ * Defines that the value of the option should be a list of
properties, which can be
+ * represented as {@code List<T>}.
+ */
+ public <T> TypedOptionBuilder<List<T>> listType(Class<T> option) {
+ return new TypedOptionBuilder<>(key, new TypeReference<List<T>>() {
+ });
+ }
+
+ public <T> TypedOptionBuilder<T> objectType(Class<T> option) {
+ return new TypedOptionBuilder<>(key, new TypeReference<T>() {
+ });
+ }
+
/**
* The value of the definition option should be represented as T.
*
* @param typeReference complex type reference
*/
public <T> TypedOptionBuilder<T> type(TypeReference<T> typeReference) {
- return new TypedOptionBuilder<T>(key, typeReference);
+ return new TypedOptionBuilder<>(key, typeReference);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
new file mode 100644
index 000000000..4823128d6
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionMark.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target(ElementType.FIELD)
+public @interface OptionMark {
+
+ /**
+ * The key of the option, if not configured, we will default convert
`lowerCamelCase` to `under_score_case` and provide it to users
+ */
+ String name() default "";
+
+ /**
+ * The description of the option
+ */
+ String description() default "";
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
index c830709c2..861859b23 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
@@ -122,17 +122,12 @@ public class OptionRule {
}
/**
- * Optional options with default value.
+ * Optional options
*
* <p> This options will not be validated.
* <p> This is used by the web-UI to show what options are available.
*/
public Builder optional(Option<?>... options) {
- for (Option<?> option : options) {
- if (option.defaultValue() == null) {
- throw new
OptionValidationException(String.format("Optional option '%s' should have
default value.", option.key()));
- }
- }
this.optionalOptions.addAll(Arrays.asList(options));
return this;
}
@@ -187,6 +182,11 @@ public class OptionRule {
return this;
}
+ public Builder bundledRequired(Option<?>... requiredOptions) {
+
this.requiredOptions.add(RequiredOption.BundledRequiredOptions.of(new
HashSet<>(Arrays.asList(requiredOptions))));
+ return this;
+ }
+
public OptionRule build() {
return new OptionRule(optionalOptions, requiredOptions);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
new file mode 100644
index 000000000..7c6835efc
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OptionUtil {
+
+ public static List<Option<?>> getOptions(Class<?> clazz) throws
InstantiationException, IllegalAccessException {
+ Field[] fields = clazz.getDeclaredFields();
+ List<Option<?>> options = new ArrayList<>();
+ Object object = clazz.newInstance();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ OptionMark option = field.getAnnotation(OptionMark.class);
+ if (option != null) {
+ options.add(new
Option<>(!StringUtils.isNotBlank(option.name()) ?
formatUnderScoreCase(field.getName()) : option.name(),
+ new TypeReference<Object>() {
+ @Override
+ public Type getType() {
+ return field.getType();
+ }
+ },
field.get(object)).withDescription(option.description()));
+ }
+ }
+ return options;
+ }
+
+ private static String formatUnderScoreCase(String camel) {
+ StringBuilder underScore = new
StringBuilder(String.valueOf(Character.toLowerCase(camel.charAt(0))));
+ for (int i = 1; i < camel.length(); i++) {
+ char c = camel.charAt(i);
+ underScore.append(Character.isLowerCase(c) ? c : "_" +
Character.toLowerCase(c));
+ }
+ return underScore.toString();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
index 692e056f4..acafaaa99 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
@@ -159,4 +159,45 @@ public interface RequiredOption {
return String.format("Condition expression: %s, Required options:
%s", expression, ExclusiveRequiredOptions.getOptionKeys(requiredOption));
}
}
+
+ /**
+ * All options must exist or not exist at the same time
+ */
+ class BundledRequiredOptions implements RequiredOption {
+ private final Set<Option<?>> requiredOption;
+
+ BundledRequiredOptions(Set<Option<?>> requiredOption) {
+ this.requiredOption = requiredOption;
+ }
+
+ public static BundledRequiredOptions of(Set<Option<?>> requiredOption)
{
+ return new BundledRequiredOptions(requiredOption);
+ }
+
+ public Set<Option<?>> getRequiredOption() {
+ return requiredOption;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof BundledRequiredOptions)) {
+ return false;
+ }
+ BundledRequiredOptions that = (BundledRequiredOptions) obj;
+ return Objects.equals(this.requiredOption, that.requiredOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.requiredOption.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Both Required options: %s",
ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+ }
+ }
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
index 5aca2bef7..4535fab3b 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionRuleTest.java
@@ -66,7 +66,7 @@ public class OptionRuleTest {
@Test
public void testOptionalException() {
Assertions.assertThrows(OptionValidationException.class,
- () -> OptionRule.builder().optional(TEST_NUM, TEST_MODE,
TEST_PORTS).build(),
+ () -> OptionRule.builder().required(TEST_NUM, TEST_MODE,
TEST_PORTS).build(),
"Optional option 'option.ports' should have default value.");
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
new file mode 100644
index 000000000..439e30ab2
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/OptionUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import org.apache.seatunnel.api.configuration.Option;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class OptionUtilTest {
+
+ @Test
+ public void test() throws InstantiationException, IllegalAccessException {
+ List<Option<?>> options =
OptionUtil.getOptions(TestOptionConfig.class);
+ Assertions.assertEquals(options.get(0).key(), "short-value");
+ Assertions.assertEquals(options.get(0).getDescription(), "shortValue");
+ Assertions.assertEquals(options.get(0).typeReference().getType(),
Short.class);
+
+ Assertions.assertEquals(options.get(1).typeReference().getType(),
Integer.class);
+ Assertions.assertEquals(options.get(1).key(), "int_value");
+ Assertions.assertEquals(options.get(1).getDescription(), "");
+ Assertions.assertNull(options.get(1).defaultValue());
+
+ Assertions.assertEquals(options.get(2).typeReference().getType(),
Long.class);
+
+ Assertions.assertEquals(options.get(3).typeReference().getType(),
Float.class);
+
+ Assertions.assertEquals(options.get(4).typeReference().getType(),
Double.class);
+
+ Assertions.assertEquals(options.get(5).typeReference().getType(),
String.class);
+ Assertions.assertEquals(options.get(5).defaultValue(), "default
string");
+
+ Assertions.assertEquals(options.get(6).typeReference().getType(),
Boolean.class);
+ Assertions.assertEquals(options.get(6).defaultValue(), true);
+
+ Assertions.assertEquals(options.get(7).typeReference().getType(),
Byte.class);
+
+ Assertions.assertEquals(options.get(8).typeReference().getType(),
Character.class);
+ Assertions.assertEquals(options.get(9).typeReference().getType(),
TestOptionConfigEnum.class);
+ Assertions.assertEquals(options.get(9).defaultValue(),
TestOptionConfigEnum.KEY2);
+
+ Assertions.assertEquals(options.get(10).typeReference().getType(),
TestOptionConfig.class);
+
+ Assertions.assertEquals(options.get(11).typeReference().getType(),
List.class);
+
+ Assertions.assertEquals(options.get(12).typeReference().getType(),
Map.class);
+
+ }
+
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
new file mode 100644
index 000000000..3fadf0d25
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class TestOptionConfig {
+
+ @OptionMark(name = "short-value", description = "shortValue")
+ private Short shortValue;
+
+ @OptionMark
+ private Integer intValue;
+
+ @OptionMark(description = "longValue")
+ private Long longValue;
+
+ @OptionMark(description = "floatValue")
+ private Float floatValue;
+
+ @OptionMark(description = "doubleValue")
+ private Double doubleValue;
+
+ @OptionMark(description = "stringValue")
+ private String stringValue = "default string";
+
+ @OptionMark(description = "booleanValue")
+ private Boolean booleanValue = true;
+
+ @OptionMark(description = "byteValue")
+ private Byte byteValue;
+
+ @OptionMark(description = "charValue")
+ private Character charValue;
+
+ @OptionMark(description = "enumValue")
+ private TestOptionConfigEnum enumValue = TestOptionConfigEnum.KEY2;
+
+ @OptionMark(description = "objectValue")
+ private TestOptionConfig objectValue;
+
+ @OptionMark(description = "listValue")
+ private List<TestOptionConfig> listValue;
+
+ @OptionMark(description = "mapValue")
+ private Map<String, String> mapValue;
+
+}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
new file mode 100644
index 000000000..66c8a07c9
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/TestOptionConfigEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+public enum TestOptionConfigEnum {
+ KEY1,
+ KEY2
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index c69baf03b..dfc646a86 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -17,6 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_VALUE;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_VALUE;
+
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -39,13 +45,13 @@ public class AssertRuleParser {
return ruleConfigList.stream()
.map(config -> {
AssertFieldRule fieldRule = new AssertFieldRule();
- fieldRule.setFieldName(config.getString("field_name"));
- if (config.hasPath("field_type")) {
-
fieldRule.setFieldType(getFieldType(config.getString("field_type")));
+ fieldRule.setFieldName(config.getString(FIELD_NAME));
+ if (config.hasPath(FIELD_TYPE)) {
+
fieldRule.setFieldType(getFieldType(config.getString(FIELD_TYPE)));
}
- if (config.hasPath("field_value")) {
- List<AssertFieldRule.AssertRule> fieldValueRules =
assembleFieldValueRules(config.getConfigList("field_value"));
+ if (config.hasPath(FIELD_VALUE)) {
+ List<AssertFieldRule.AssertRule> fieldValueRules =
assembleFieldValueRules(config.getConfigList(FIELD_VALUE));
fieldRule.setFieldRules(fieldValueRules);
}
return fieldRule;
@@ -57,11 +63,11 @@ public class AssertRuleParser {
return fieldValueConfigList.stream()
.map(config -> {
AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
- if (config.hasPath("rule_type")) {
-
valueRule.setRuleType(AssertFieldRule.AssertRuleType.valueOf(config.getString("rule_type")));
+ if (config.hasPath(RULE_TYPE)) {
+
valueRule.setRuleType(AssertFieldRule.AssertRuleType.valueOf(config.getString(RULE_TYPE)));
}
- if (config.hasPath("rule_value")) {
- valueRule.setRuleValue(config.getDouble("rule_value"));
+ if (config.hasPath(RULE_VALUE)) {
+ valueRule.setRuleValue(config.getDouble(RULE_VALUE));
}
return valueRule;
})
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
new file mode 100644
index 000000000..7b9fa76b4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AssertConfig {
+
+ public static final String RULE_TYPE = "rule_type";
+
+ public static final String RULE_VALUE = "rule_value";
+
+ public static final String ROW_RULES = "row_rules";
+
+ public static final String FIELD_NAME = "field_name";
+
+ public static final String FIELD_TYPE = "field_type";
+
+ public static final String FIELD_VALUE = "field_value";
+
+ public static final String FIELD_RULES = "field_rules";
+
+ public static final Option<Rules> RULES =
Options.key("rules").objectType(Rules.class)
+ .noDefaultValue().withDescription("Rule definition of user's available
data. Each rule represents one field validation or row num validation.");
+
+
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 5c611f760..77d682d63 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -39,9 +43,6 @@ import java.util.List;
@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private static final String RULES = "rules";
- private static final String ROW_RULES = "row_rules";
- private static final String FIELD_RULES = "field_rules";
private SeaTunnelRowType seaTunnelRowType;
private List<AssertFieldRule> assertFieldRules;
private List<AssertFieldRule.AssertRule> assertRowRules;
@@ -63,10 +64,10 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) {
- if (!pluginConfig.hasPath(RULES)) {
- Throwables.propagateIfPossible(new ConfigException.Missing(RULES));
+ if (!pluginConfig.hasPath(RULES.key())) {
+ Throwables.propagateIfPossible(new
ConfigException.Missing(RULES.key()));
}
- Config ruleConfig = pluginConfig.getConfig(RULES);
+ Config ruleConfig = pluginConfig.getConfig(RULES.key());
List<? extends Config> rowConfigList = null;
List<? extends Config> configList = null;
if (ruleConfig.hasPath(ROW_RULES)) {
@@ -79,7 +80,7 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
if (CollectionUtils.isEmpty(configList) &&
CollectionUtils.isEmpty(rowConfigList)) {
- Throwables.propagateIfPossible(new ConfigException.BadValue(RULES,
"Assert rule config is empty, please add rule config."));
+ Throwables.propagateIfPossible(new
ConfigException.BadValue(RULES.key(), "Assert rule config is empty, please add
rule config."));
}
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
new file mode 100644
index 000000000..c3c6ad553
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AssertSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "AssertSink";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(RULES).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
new file mode 100644
index 000000000..ff1eeefd5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/FieldRule.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class FieldRule {
+
+ @OptionMark(description = "field name")
+ private String fieldName;
+
+ @OptionMark(description = "field type")
+ private String fieldType;
+
+ @OptionMark(description = "A list value rule define the data value
validation")
+ private List<RowRule> fieldValue;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
new file mode 100644
index 000000000..274ed88ed
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/RowRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+
+import lombok.Data;
+
+@Data
+public class RowRule {
+
+ @OptionMark(description = "The rule type of the rule")
+ private AssertFieldRule.AssertRuleType ruleType;
+
+ @OptionMark(description = "The value related to rule type")
+ private Double ruleValue;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
new file mode 100644
index 000000000..c51e9fd77
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/Rules.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Rules {
+
+ @OptionMark(description = "row rules for row validation")
+ private List<RowRule> rowRules;
+
+ @OptionMark(description = "field rules for field validation")
+ private List<FieldRule> fieldRules;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
new file mode 100644
index 000000000..62f7bd398
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("checkstyle:MagicNumber")
+public class ClickhouseConfig {
+
+ /**
+ * Bulk size of clickhouse jdbc
+ */
+ public static final Option<Integer> BULK_SIZE =
Options.key("bulk_size").intType()
+ .defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");
+
+ /**
+ * Clickhouse fields
+ */
+ public static final Option<String> FIELDS =
Options.key("fields").stringType()
+ .noDefaultValue().withDescription("Clickhouse fields");
+
+ public static final Option<String> SQL = Options.key("sql").stringType()
+ .noDefaultValue().withDescription("Clickhouse sql used to query data");
+
+ /**
+ * Clickhouse server host
+ */
+ public static final Option<String> HOST = Options.key("host").stringType()
+ .noDefaultValue().withDescription("Clickhouse server host");
+
+ /**
+ * Clickhouse table name
+ */
+ public static final Option<String> TABLE =
Options.key("table").stringType()
+ .noDefaultValue().withDescription("Clickhouse table name");
+
+ /**
+ * Clickhouse database name
+ */
+ public static final Option<String> DATABASE =
Options.key("database").stringType()
+ .noDefaultValue().withDescription("Clickhouse database name");
+
+ /**
+ * Clickhouse server username
+ */
+ public static final Option<String> USERNAME =
Options.key("username").stringType()
+ .noDefaultValue().withDescription("Clickhouse server username");
+
+ /**
+ * Clickhouse server password
+ */
+ public static final Option<String> PASSWORD =
Options.key("password").stringType()
+ .noDefaultValue().withDescription("Clickhouse server password");
+
+ /**
+ * Split mode when table is distributed engine
+ */
+ public static final Option<Boolean> SPLIT_MODE =
Options.key("split_mode").booleanType()
+ .defaultValue(false).withDescription("Split mode when table is
distributed engine");
+
+ /**
+ * When split_mode is true, the sharding_key use for split
+ */
+ public static final Option<String> SHARDING_KEY =
Options.key("sharding_key").stringType()
+ .noDefaultValue().withDescription("When split_mode is true, the
sharding_key use for split");
+
+ /**
+ * ClickhouseFile sink connector used clickhouse-local program's path
+ */
+ public static final Option<String> CLICKHOUSE_LOCAL_PATH =
Options.key("clickhouse_local_path").stringType()
+ .noDefaultValue().withDescription("ClickhouseFile sink connector used
clickhouse-local program's path");
+
+ /**
+ * The method of copy Clickhouse file
+ */
+ public static final Option<ClickhouseFileCopyMethod> COPY_METHOD =
Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class)
+ .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The
method of copy Clickhouse file");
+
+ public static final String NODE_ADDRESS = "node_address";
+ /**
+ * The password of Clickhouse server node
+ */
+ public static final Option<List<NodePassConfig>> NODE_PASS =
Options.key("node_pass").listType(NodePassConfig.class)
+ .noDefaultValue().withDescription("The password of Clickhouse server
node");
+
+ public static final Option<Map<String, String>> CLICKHOUSE_PREFIX =
Options.key("clickhouse").mapType()
+ .defaultValue(Collections.emptyMap()).withDescription("Clickhouse
custom config");
+
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
deleted file mode 100644
index 6563274ba..000000000
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
-
-public class Config {
-
- /**
- * Bulk size of clickhouse jdbc
- */
- public static final String BULK_SIZE = "bulk_size";
-
- /**
- * Clickhouse fields
- */
- public static final String FIELDS = "fields";
-
- public static final String SQL = "sql";
-
- /**
- * Clickhouse server host
- */
- public static final String HOST = "host";
-
- /**
- * Clickhouse table name
- */
- public static final String TABLE = "table";
-
- /**
- * Clickhouse database name
- */
- public static final String DATABASE = "database";
-
- /**
- * Clickhouse server username
- */
- public static final String USERNAME = "username";
-
- /**
- * Clickhouse server password
- */
- public static final String PASSWORD = "password";
-
- /**
- * Split mode when table is distributed engine
- */
- public static final String SPLIT_MODE = "split_mode";
-
- /**
- * When split_mode is true, the sharding_key use for split
- */
- public static final String SHARDING_KEY = "sharding_key";
-
- /**
- * ClickhouseFile sink connector used clickhouse-local program's path
- */
- public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";
-
- /**
- * The method of copy Clickhouse file
- */
- public static final String COPY_METHOD = "copy_method";
-
- /**
- * The size of each batch read temporary data into local file.
- */
- public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";
-
- /**
- * The password of Clickhouse server node
- */
- public static final String NODE_PASS = "node_pass";
-
- /**
- * The address of Clickhouse server node
- */
- public static final String NODE_ADDRESS = "node_address";
-
- public static final String CLICKHOUSE_PREFIX = "clickhouse.";
-
-}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
new file mode 100644
index 000000000..7340a9176
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/NodePassConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionMark;
+
+import lombok.Data;
+
+@Data
+public class NodePassConfig {
+
+ @OptionMark(description = "The address of Clickhouse server node")
+ private String nodeAddress;
+
+ @OptionMark(description = "Clickhouse server linux password")
+ private String password;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 3c2c1104d..028fe4a5c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,16 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.BULK_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_PREFIX;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SPLIT_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -76,58 +76,58 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, HOST,
DATABASE, TABLE);
+ CheckResult result = CheckConfigUtil.checkAllExists(config,
HOST.key(), DATABASE.key(), TABLE.key());
- boolean isCredential = config.hasPath(USERNAME) ||
config.hasPath(PASSWORD);
+ boolean isCredential = config.hasPath(USERNAME.key()) ||
config.hasPath(PASSWORD.key());
if (isCredential) {
- result = CheckConfigUtil.checkAllExists(config, USERNAME,
PASSWORD);
+ result = CheckConfigUtil.checkAllExists(config, USERNAME.key(),
PASSWORD.key());
}
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
Map<String, Object> defaultConfig = ImmutableMap.<String,
Object>builder()
- .put(BULK_SIZE, 20_000)
- .put(SPLIT_MODE, false)
+ .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
+ .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
.build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
List<ClickHouseNode> nodes;
if (!isCredential) {
- nodes = ClickhouseUtil.createNodes(config.getString(HOST),
config.getString(DATABASE),
- null, null);
+ nodes = ClickhouseUtil.createNodes(config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ null, null);
} else {
- nodes = ClickhouseUtil.createNodes(config.getString(HOST),
- config.getString(DATABASE), config.getString(USERNAME),
config.getString(PASSWORD));
+ nodes = ClickhouseUtil.createNodes(config.getString(HOST.key()),
+ config.getString(DATABASE.key()),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
}
Properties clickhouseProperties = new Properties();
- if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
- TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX,
false).entrySet().forEach(e -> {
+ if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX.key() +
".")) {
+ TypesafeConfigUtils.extractSubConfig(config,
CLICKHOUSE_PREFIX.key() + ".", false).entrySet().forEach(e -> {
clickhouseProperties.put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
});
}
if (isCredential) {
- clickhouseProperties.put("user", config.getString(USERNAME));
- clickhouseProperties.put("password", config.getString(PASSWORD));
+ clickhouseProperties.put("user", config.getString(USERNAME.key()));
+ clickhouseProperties.put("password",
config.getString(PASSWORD.key()));
}
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
- Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE));
+ Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
String shardKey = null;
String shardKeyType = null;
- if (config.getBoolean(SPLIT_MODE)) {
- ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE),
- config.getString(TABLE));
+ if (config.getBoolean(SPLIT_MODE.key())) {
+ ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE.key()),
+ config.getString(TABLE.key()));
if (!"Distributed".equals(table.getEngine())) {
throw new IllegalArgumentException("split mode only support
table which engine is " +
- "'Distributed' engine at now");
+ "'Distributed' engine at now");
}
- if (config.hasPath(SHARDING_KEY)) {
- shardKey = config.getString(SHARDING_KEY);
+ if (config.hasPath(SHARDING_KEY.key())) {
+ shardKey = config.getString(SHARDING_KEY.key());
shardKeyType = tableSchema.get(shardKey);
}
}
@@ -135,36 +135,36 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
if (isCredential) {
metadata = new ShardMetadata(
- shardKey,
- shardKeyType,
- config.getString(DATABASE),
- config.getString(TABLE),
- config.getBoolean(SPLIT_MODE),
- new Shard(1, 1, nodes.get(0)), config.getString(USERNAME),
config.getString(PASSWORD));
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE.key()),
+ config.getString(TABLE.key()),
+ config.getBoolean(SPLIT_MODE.key()),
+ new Shard(1, 1, nodes.get(0)),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
} else {
metadata = new ShardMetadata(
- shardKey,
- shardKeyType,
- config.getString(DATABASE),
- config.getString(TABLE),
- config.getBoolean(SPLIT_MODE),
- new Shard(1, 1, nodes.get(0)));
+ shardKey,
+ shardKeyType,
+ config.getString(DATABASE.key()),
+ config.getString(TABLE.key()),
+ config.getBoolean(SPLIT_MODE.key()),
+ new Shard(1, 1, nodes.get(0)));
}
List<String> fields = new ArrayList<>();
- if (config.hasPath(FIELDS)) {
- fields.addAll(config.getStringList(FIELDS));
+ if (config.hasPath(FIELDS.key())) {
+ fields.addAll(config.getStringList(FIELDS.key()));
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
- throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE));
+ throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields.addAll(tableSchema.keySet());
}
proxy.close();
- this.option = new ReaderOption(metadata, clickhouseProperties, fields,
tableSchema, config.getInt(BULK_SIZE));
+ this.option = new ReaderOption(metadata, clickhouseProperties, fields,
tableSchema, config.getInt(BULK_SIZE.key()));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
new file mode 100644
index 000000000..a804400f9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "ClickhouseSink";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(HOST, DATABASE, TABLE)
+ .optional(CLICKHOUSE_PREFIX, BULK_SIZE, SPLIT_MODE, FIELDS,
SHARDING_KEY)
+ .bundledRequired(USERNAME, PASSWORD).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 319937ac1..e32ecd2ef 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -17,17 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_LOCAL_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.COPY_METHOD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_PASS;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -73,55 +73,55 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST,
TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH);
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config,
HOST.key(), TABLE.key(), DATABASE.key(), USERNAME.key(), PASSWORD.key(),
CLICKHOUSE_LOCAL_PATH.key());
if (!checkResult.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
checkResult.getMsg());
}
Map<String, Object> defaultConfigs = ImmutableMap.<String,
Object>builder()
- .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName())
+ .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
.build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
- List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(config.getString(HOST),
- config.getString(DATABASE), config.getString(USERNAME),
config.getString(PASSWORD));
+ List<ClickHouseNode> nodes =
ClickhouseUtil.createNodes(config.getString(HOST.key()),
+ config.getString(DATABASE.key()),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
- Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE));
+ Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
String shardKey = null;
String shardKeyType = null;
- if (config.hasPath(SHARDING_KEY)) {
- shardKey = config.getString(SHARDING_KEY);
+ if (config.hasPath(SHARDING_KEY.key())) {
+ shardKey = config.getString(SHARDING_KEY.key());
shardKeyType = tableSchema.get(shardKey);
}
ShardMetadata shardMetadata = new ShardMetadata(
shardKey,
shardKeyType,
- config.getString(DATABASE),
- config.getString(TABLE),
+ config.getString(DATABASE.key()),
+ config.getString(TABLE.key()),
false, // we don't need to set splitMode in clickhouse file
mode.
- new Shard(1, 1, nodes.get(0)), config.getString(USERNAME),
config.getString(PASSWORD));
+ new Shard(1, 1, nodes.get(0)),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
List<String> fields;
- if (config.hasPath(FIELDS)) {
- fields = config.getStringList(FIELDS);
+ if (config.hasPath(FIELDS.key())) {
+ fields = config.getStringList(FIELDS.key());
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
- throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE));
+ throw new RuntimeException("Field " + field + " does not
exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields = new ArrayList<>(tableSchema.keySet());
}
- Map<String, String> nodeUser = config.getObjectList(NODE_PASS).stream()
+ Map<String, String> nodeUser =
config.getObjectList(NODE_PASS.key()).stream()
.collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
- configObject -> configObject.toConfig().hasPath(USERNAME)
? configObject.toConfig().getString(USERNAME) : "root"));
- Map<String, String> nodePassword =
config.getObjectList(NODE_PASS).stream()
+ configObject ->
configObject.toConfig().hasPath(USERNAME.key()) ?
configObject.toConfig().getString(USERNAME.key()) : "root"));
+ Map<String, String> nodePassword =
config.getObjectList(NODE_PASS.key()).stream()
.collect(Collectors.toMap(configObject ->
configObject.toConfig().getString(NODE_ADDRESS),
- configObject ->
configObject.toConfig().getString(PASSWORD)));
+ configObject ->
configObject.toConfig().getString(PASSWORD.key())));
proxy.close();
- this.readerOption = new FileReaderOption(shardMetadata, tableSchema,
fields, config.getString(CLICKHOUSE_LOCAL_PATH),
- ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD)),
nodeUser, nodePassword);
+ this.readerOption = new FileReaderOption(shardMetadata, tableSchema,
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
+
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser,
nodePassword);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
new file mode 100644
index 000000000..c08f39484
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
+
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseFileSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "ClickhouseFileSink";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME,
PASSWORD, CLICKHOUSE_LOCAL_PATH)
+ .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 8706fe059..59a24cac9 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.Boundedness;
@@ -64,19 +64,19 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config, HOST,
DATABASE, SQL, USERNAME, PASSWORD);
+ CheckResult result = CheckConfigUtil.checkAllExists(config,
HOST.key(), DATABASE.key(), SQL.key(), USERNAME.key(), PASSWORD.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
- servers = ClickhouseUtil.createNodes(config.getString(HOST),
config.getString(DATABASE),
- config.getString(USERNAME), config.getString(PASSWORD));
+ servers = ClickhouseUtil.createNodes(config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
- sql = config.getString(SQL);
+ sql = config.getString(SQL.key());
ClickHouseNode currentServer =
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
try (ClickHouseClient client =
ClickHouseClient.newInstance(currentServer.getProtocol());
ClickHouseResponse response =
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-
.query(modifySQLToLimit1(config.getString(SQL))).executeAndWait()) {
+
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
int columnSize = response.getColumns().size();
String[] fieldNames = new String[columnSize];
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
new file mode 100644
index 000000000..0f1c8e285
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "ClickhouseSource";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME,
PASSWORD).build();
+ }
+}