This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cc8bb60c8 [Feature][Connector-V2][Http] Add option rules && Improve 
Myhours sink connector (#3351)
cc8bb60c8 is described below

commit cc8bb60c83f8d780e718d72a54e9fb5604cbeaac
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Nov 9 22:04:50 2022 +0800

    [Feature][Connector-V2][Http] Add option rules && Improve Myhours sink 
connector (#3351)
    
    * [Feature][Connector-V2][Http] Add option rules && Improve Myhours 
Connector
    
    * [Feature][Connector-V2][Http] Improve codes
    
    * [Feature][Connector-V2][Http] Fix code style
---
 .../seatunnel/http/config/HttpConfig.java          | 58 ++++++++++++++++++----
 .../seatunnel/http/config/HttpParameter.java       | 45 ++++++++++-------
 .../connectors/seatunnel/http/sink/HttpSink.java   | 12 ++---
 .../seatunnel/http/sink/HttpSinkFactory.java       | 45 +++++++++++++++++
 .../seatunnel/http/source/HttpSource.java          | 10 ++--
 .../seatunnel/http/source/HttpSourceFactory.java   | 53 ++++++++++++++++++++
 .../seatunnel/feishu/sink/FeishuSinkFactory.java}  | 25 +++++-----
 .../seatunnel/myhours/source/MyHoursSource.java    | 21 ++++----
 .../myhours/source/MyHoursSourceFactory.java       | 45 +++++++++++++++++
 .../myhours/source/config/MyHoursSourceConfig.java | 25 +++++++---
 .../source/config/MyHoursSourceParameter.java      | 30 ++++-------
 .../seatunnel/wechat/sink/WeChatSinkFactory.java   | 45 +++++++++++++++++
 .../wechat/sink/config/WeChatSinkConfig.java       | 28 +++++++----
 13 files changed, 339 insertions(+), 103 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 8ee089aec..b7df0a25d 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -17,18 +17,54 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Map;
+
 public class HttpConfig {
-    public static final String URL = "url";
-    public static final String METHOD = "method";
     public static final String METHOD_DEFAULT_VALUE = "GET";
-    public static final String HEADERS = "headers";
-    public static final String PARAMS = "params";
-    public static final String BODY = "body";
-    public static final String SCHEMA = "schema";
-    public static final String FORMAT = "format";
     public static final String DEFAULT_FORMAT = "json";
-    public static final String POLL_INTERVAL_MILLS = "poll_interval_ms";
-    public static final String RETRY = "retry";
-    public static final String RETRY_BACKOFF_MULTIPLIER_MS = 
"retry_backoff_multiplier_ms";
-    public static final String RETRY_BACKOFF_MAX_MS = "retry_backoff_max_ms";
+    public static final int DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS = 100;
+    public static final int DEFAULT_RETRY_BACKOFF_MAX_MS = 10000;
+    public static final Option<String> URL = Options.key("url")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Http request url");
+    public static final Option<String> METHOD = Options.key("method")
+            .stringType()
+            .defaultValue(METHOD_DEFAULT_VALUE)
+            .withDescription("Http request method");
+    public static final Option<Map<String, String>> HEADERS = 
Options.key("headers")
+            .mapType()
+            .noDefaultValue()
+            .withDescription("Http request headers");
+    public static final Option<Map<String, String>> PARAMS = 
Options.key("params")
+            .mapType()
+            .noDefaultValue()
+            .withDescription("Http request params");
+    public static final Option<String> BODY = Options.key("body")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Http request body");
+    public static final Option<String> FORMAT = Options.key("format")
+            .stringType()
+            .defaultValue(DEFAULT_FORMAT)
+            .withDescription("Http response format");
+    public static final Option<Integer> POLL_INTERVAL_MILLS = 
Options.key("poll_interval_millis")
+            .intType()
+            .noDefaultValue()
+            .withDescription("Request http api interval(millis) in stream 
mode");
+    public static final Option<Integer> RETRY = Options.key("retry")
+            .intType()
+            .noDefaultValue()
+            .withDescription("The max retry times if request http return to 
IOException");
+    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = 
Options.key("retry_backoff_multiplier_ms")
+            .intType()
+            .defaultValue(DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS)
+            .withDescription("The retry-backoff times(millis) multiplier if 
request http failed");
+    public static final Option<Integer> RETRY_BACKOFF_MAX_MS = 
Options.key("retry_backoff_max_ms")
+            .intType()
+            .defaultValue(DEFAULT_RETRY_BACKOFF_MAX_MS)
+            .withDescription("The maximum retry-backoff times(millis) if 
request http failed");
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index aaccb57ac..8db67572d 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -35,40 +35,47 @@ public class HttpParameter implements Serializable {
     protected String body;
     protected int pollIntervalMillis;
     protected int retry;
-    protected int retryBackoffMultiplierMillis = 100;
-    protected int retryBackoffMaxMillis = 10000;
+    protected int retryBackoffMultiplierMillis = 
HttpConfig.DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS;
+    protected int retryBackoffMaxMillis = 
HttpConfig.DEFAULT_RETRY_BACKOFF_MAX_MS;
 
     public void buildWithConfig(Config pluginConfig) {
         // set url
-        this.setUrl(pluginConfig.getString(HttpConfig.URL));
+        this.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
         // set method
-        if (pluginConfig.hasPath(HttpConfig.METHOD)) {
-            this.setMethod(pluginConfig.getString(HttpConfig.METHOD));
+        if (pluginConfig.hasPath(HttpConfig.METHOD.key())) {
+            this.setMethod(pluginConfig.getString(HttpConfig.METHOD.key()));
         } else {
             this.setMethod(HttpConfig.METHOD_DEFAULT_VALUE);
         }
         // set headers
-        if (pluginConfig.hasPath(HttpConfig.HEADERS)) {
-            
this.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        if (pluginConfig.hasPath(HttpConfig.HEADERS.key())) {
+            
this.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
         }
         // set params
-        if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
-            
this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        if (pluginConfig.hasPath(HttpConfig.PARAMS.key())) {
+            this.setParams(pluginConfig.getConfig(HttpConfig.PARAMS.key())
+                    .entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
         }
         // set body
-        if (pluginConfig.hasPath(HttpConfig.BODY)) {
-            this.setBody(pluginConfig.getString(HttpConfig.BODY));
+        if (pluginConfig.hasPath(HttpConfig.BODY.key())) {
+            this.setBody(pluginConfig.getString(HttpConfig.BODY.key()));
         }
-        if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS)) {
-            
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS));
+        if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS.key())) {
+            
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS.key()));
         }
-        if (pluginConfig.hasPath(HttpConfig.RETRY)) {
-            this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
-            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
-                
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
+        this.setRetryParameters(pluginConfig);
+    }
+
+    public void setRetryParameters(Config pluginConfig) {
+        if (pluginConfig.hasPath(HttpConfig.RETRY.key())) {
+            this.setRetry(pluginConfig.getInt(HttpConfig.RETRY.key()));
+            if 
(pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+                
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS.key()));
             }
-            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
-                
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
+            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS.key())) {
+                
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS.key()));
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 10bbe46a6..af44593ab 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -53,16 +53,16 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.pluginConfig = pluginConfig;
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
-        httpParameter.setUrl(pluginConfig.getString(HttpConfig.URL));
-        if (pluginConfig.hasPath(HttpConfig.HEADERS)) {
-            
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        httpParameter.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
+        if (pluginConfig.hasPath(HttpConfig.HEADERS.key())) {
+            
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.HEADERS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
         }
-        if (pluginConfig.hasPath(HttpConfig.PARAMS)) {
-            
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.PARAMS).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
+        if (pluginConfig.hasPath(HttpConfig.PARAMS.key())) {
+            
httpParameter.setHeaders(pluginConfig.getConfig(HttpConfig.PARAMS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> String.valueOf(entry.getValue().unwrapped()), (v1, v2) -> v2)));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
new file mode 100644
index 000000000..0d961f711
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HttpSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Http";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(HttpConfig.URL)
+                .optional(HttpConfig.HEADERS)
+                .optional(HttpConfig.PARAMS)
+                .optional(HttpConfig.RETRY)
+                .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+                .optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 8fbb9bff2..69759f16c 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -60,7 +60,7 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
@@ -69,13 +69,13 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     protected void buildSchemaWithConfig(Config pluginConfig) {
-        if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
-            Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            Config schema = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
             this.rowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
             // default use json format
             String format = HttpConfig.DEFAULT_FORMAT;
-            if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
-                format = pluginConfig.getString(HttpConfig.FORMAT);
+            if (pluginConfig.hasPath(HttpConfig.FORMAT.key())) {
+                format = pluginConfig.getString(HttpConfig.FORMAT.key());
             }
             switch (format) {
                 case HttpConfig.DEFAULT_FORMAT:
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
new file mode 100644
index 000000000..7adb9ab61
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.source;
+
+import org.apache.seatunnel.api.configuration.util.Condition;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HttpSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Http";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(HttpConfig.URL)
+                .optional(HttpConfig.METHOD)
+                .optional(HttpConfig.HEADERS)
+                .optional(HttpConfig.PARAMS)
+                .conditional(Condition.of(HttpConfig.METHOD, "post"), 
HttpConfig.BODY)
+                .conditional(Condition.of(HttpConfig.FORMAT, "json"), 
SeaTunnelSchema.SCHEMA)
+                .optional(HttpConfig.FORMAT)
+                .optional(HttpConfig.POLL_INTERVAL_MILLS)
+                .optional(HttpConfig.RETRY)
+                .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+                .optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
similarity index 52%
copy from 
seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
copy to 
seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
index 32466a761..2ee37f048 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
@@ -15,18 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
+package org.apache.seatunnel.connectors.seatunnel.feishu.sink;
 
-public class MyHoursSourceConfig {
-    public static final String URL = "url";
-    public static final String POST = "POST";
-    public static final String EMAIL = "email";
-    public static final String PASSWORD = "password";
-    public static final String GRANTTYPE = "grantType";
-    public static final String CLIENTID = "clientId";
-    public static final String API = "api";
-    public static final String AUTHORIZATION = "Authorization";
-    public static final String ACCESSTOKEN = "accessToken";
-    public static final String ACCESSTOKEN_PREFIX = "Bearer";
-    public static final String AUTHORIZATION_URL = 
"https://api2.myhours.com/api/tokens/login";;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FeishuSinkFactory extends HttpSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Feishu";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index 9d7e7a937..473afb0f6 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -41,7 +41,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Objects;
 
 @Slf4j
 @AutoService(SeaTunnelSource.class)
@@ -55,13 +54,13 @@ public class MyHoursSource extends HttpSource {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
MyHoursSourceConfig.URL, MyHoursSourceConfig.EMAIL, 
MyHoursSourceConfig.PASSWORD);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
MyHoursSourceConfig.URL.key(),
+                MyHoursSourceConfig.EMAIL.key(), 
MyHoursSourceConfig.PASSWORD.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
-        //Login to get accessToken
-        String accessToken = null;
-        accessToken = getAccessToken(pluginConfig);
+        // Login to get accessToken
+        String accessToken = getAccessToken(pluginConfig);
         this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken);
         buildSchemaWithConfig(pluginConfig);
     }
@@ -81,7 +80,7 @@ public class MyHoursSource extends HttpSource {
                 String content = response.getContent();
                 if (!Strings.isNullOrEmpty(content)) {
                     Map<String, String> contentMap = JsonUtils.toMap(content);
-                    return contentMap.get(MyHoursSourceConfig.ACCESSTOKEN);
+                    return contentMap.get(MyHoursSourceConfig.ACCESS_TOKEN);
                 }
             }
             throw new RuntimeException(String.format("login http client 
execute exception, http response status code:[%d], content:[%s]",
@@ -90,12 +89,10 @@ public class MyHoursSource extends HttpSource {
         } catch (Exception e) {
             throw new RuntimeException("login http client execute exception");
         } finally {
-            if (Objects.nonNull(loginHttpClient)) {
-                try {
-                    loginHttpClient.close();
-                } catch (IOException e) {
-                    log.warn(e.getMessage(), e);
-                }
+            try {
+                loginHttpClient.close();
+            } catch (IOException e) {
+                log.warn(e.getMessage(), e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
new file mode 100644
index 000000000..09df13ad3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSourceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.myhours.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MyHoursSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "MyHours";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(MyHoursSourceConfig.URL)
+                .required(MyHoursSourceConfig.EMAIL)
+                .required(MyHoursSourceConfig.PASSWORD)
+                .optional(MyHoursSourceConfig.RETRY)
+                .optional(MyHoursSourceConfig.RETRY_BACKOFF_MAX_MS)
+                .optional(MyHoursSourceConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
index 32466a761..7f96994bc 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java
@@ -17,16 +17,25 @@
 
 package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
 
-public class MyHoursSourceConfig {
-    public static final String URL = "url";
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+public class MyHoursSourceConfig extends HttpConfig {
     public static final String POST = "POST";
-    public static final String EMAIL = "email";
-    public static final String PASSWORD = "password";
-    public static final String GRANTTYPE = "grantType";
-    public static final String CLIENTID = "clientId";
+    public static final String GRANT_TYPE = "grantType";
+    public static final String CLIENT_ID = "clientId";
     public static final String API = "api";
     public static final String AUTHORIZATION = "Authorization";
-    public static final String ACCESSTOKEN = "accessToken";
-    public static final String ACCESSTOKEN_PREFIX = "Bearer";
+    public static final String ACCESS_TOKEN = "accessToken";
+    public static final String ACCESS_TOKEN_PREFIX = "Bearer";
     public static final String AUTHORIZATION_URL = 
"https://api2.myhours.com/api/tokens/login";;
+    public static final Option<String> EMAIL = Options.key("email")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("My hours login email address");
+    public static final Option<String> PASSWORD = Options.key("password")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("My hours login password");
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
index 40beedb27..5bd11d2d1 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
 
 import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -29,9 +28,9 @@ import java.util.Map;
 public class MyHoursSourceParameter extends HttpParameter {
     public void buildWithConfig(Config pluginConfig, String accessToken) {
         super.buildWithConfig(pluginConfig);
-        //put authorization in headers
+        // put authorization in headers
         this.headers = this.getHeaders() == null ? new HashMap<>() : 
this.getHeaders();
-        this.headers.put(MyHoursSourceConfig.AUTHORIZATION, 
MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken);
+        this.headers.put(MyHoursSourceConfig.AUTHORIZATION, 
MyHoursSourceConfig.ACCESS_TOKEN_PREFIX + " " + accessToken);
         this.setHeaders(this.headers);
     }
 
@@ -41,24 +40,15 @@ public class MyHoursSourceParameter extends HttpParameter {
         // set method
         this.setMethod(MyHoursSourceConfig.POST);
         // set body
-        Map bodyParams = new HashMap();
-        String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL);
-        String password = pluginConfig.getString(MyHoursSourceConfig.PASSWORD);
-        bodyParams.put(MyHoursSourceConfig.GRANTTYPE, 
MyHoursSourceConfig.PASSWORD);
-        bodyParams.put(MyHoursSourceConfig.EMAIL, email);
-        bodyParams.put(MyHoursSourceConfig.PASSWORD, password);
-        bodyParams.put(MyHoursSourceConfig.CLIENTID, MyHoursSourceConfig.API);
+        Map<String, String> bodyParams = new HashMap<>();
+        String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL.key());
+        String password = 
pluginConfig.getString(MyHoursSourceConfig.PASSWORD.key());
+        bodyParams.put(MyHoursSourceConfig.GRANT_TYPE, 
MyHoursSourceConfig.PASSWORD.key());
+        bodyParams.put(MyHoursSourceConfig.EMAIL.key(), email);
+        bodyParams.put(MyHoursSourceConfig.PASSWORD.key(), password);
+        bodyParams.put(MyHoursSourceConfig.CLIENT_ID, MyHoursSourceConfig.API);
         String body = JsonUtils.toJsonString(bodyParams);
         this.setBody(body);
-        // set retry
-        if (pluginConfig.hasPath(HttpConfig.RETRY)) {
-            this.setRetry(pluginConfig.getInt(HttpConfig.RETRY));
-            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) {
-                
this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS));
-            }
-            if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) {
-                
this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS));
-            }
-        }
+        this.setRetryParameters(pluginConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
new file mode 100644
index 000000000..b4b4d9e14
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSinkFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class WeChatSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "WeChat";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(WeChatSinkConfig.URL)
+                .optional(WeChatSinkConfig.MENTIONED_LIST)
+                .optional(WeChatSinkConfig.MENTIONED_MOBILE_LIST)
+                .optional(WeChatSinkConfig.RETRY)
+                .optional(WeChatSinkConfig.RETRY_BACKOFF_MAX_MS)
+                .optional(WeChatSinkConfig.RETRY_BACKOFF_MULTIPLIER_MS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
index be2baab60..8936fedbf 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
@@ -17,29 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.wechat.sink.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import lombok.Data;
+import lombok.Getter;
 import lombok.NonNull;
 
 import java.util.List;
 
-@Data
-public class WeChatSinkConfig {
+@Getter
+public class WeChatSinkConfig extends HttpConfig {
     public static final String WECHAT_SEND_MSG_SUPPORT_TYPE = "text";
     public static final String WECHAT_SEND_MSG_TYPE_KEY = "msgtype";
     public static final String WECHAT_SEND_MSG_CONTENT_KEY = "content";
-    public static final String MENTIONED_LIST = "mentioned_list";
-    public static final String MENTIONED_MOBILE_LIST = "mentioned_mobile_list";
+    public static final Option<List<String>> MENTIONED_LIST = 
Options.key("mentioned_list")
+            .listType()
+            .noDefaultValue()
+            .withDescription("A list of userids to remind the specified 
members in the group (@ a member), @ all means to remind everyone");
+    public static final Option<List<String>> MENTIONED_MOBILE_LIST = 
Options.key("mentioned_mobile_list")
+            .listType()
+            .noDefaultValue()
+            .withDescription("Mobile phone number list, remind the group 
member corresponding to the mobile phone number (@ a member), @ all means 
remind everyone");
     private List<String> mentionedList;
     private List<String> mentionedMobileList;
 
     public WeChatSinkConfig(@NonNull Config pluginConfig){
-        if (pluginConfig.hasPath(MENTIONED_LIST)) {
-            this.mentionedList = pluginConfig.getStringList(MENTIONED_LIST);
+        if (pluginConfig.hasPath(MENTIONED_LIST.key())) {
+            this.mentionedList = 
pluginConfig.getStringList(MENTIONED_LIST.key());
         }
-        if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST)) {
-            this.mentionedMobileList = 
pluginConfig.getStringList(MENTIONED_MOBILE_LIST);
+        if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST.key())) {
+            this.mentionedMobileList = 
pluginConfig.getStringList(MENTIONED_MOBILE_LIST.key());
         }
     }
 }


Reply via email to