This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 395c3f325828bce0497fa5f54a385f071d02484d
Author: zh378814 <[email protected]>
AuthorDate: Fri Apr 22 14:04:13 2022 +0800

    Remove redundant verification, complete document example parameters, and 
optimize RocketMQ's key and tag acquisition logic.
---
 .../aliyun/rocketmq-connect-dingtalk/README.md     |  6 +-
 .../connect/dingtalk/sink/DingTalkSinkTask.java    |  5 --
 connectors/aliyun/rocketmq-connect-fc/README.md    |  5 +-
 .../rocketmq/connect/fc/sink/FcSinkTask.java       |  8 ---
 connectors/aliyun/rocketmq-connect-mns/README.md   |  5 +-
 .../rocketmq/connect/mns/source/MNSSourceTask.java |  8 ---
 .../aliyun/rocketmq-connect-rocketmq/README.md     | 24 ++++----
 .../connect/rocketmq/RocketMQSinkConnector.java    | 25 ++++++++
 .../connect/rocketmq/RocketMQSinkTask.java         | 39 ++----------
 .../connect/rocketmq/RocketMQSourceConnector.java  | 34 +++++++++++
 .../connect/rocketmq/RocketMQSourceTask.java       | 70 ++++++----------------
 .../connect/rocketmq/common/RocketMQConstant.java  |  4 ++
 .../rocketmq/RocketMQSinkConnectorTest.java        |  3 +-
 .../rocketmq/RocketMQSourceConnectorTest.java      | 12 ++++
 connectors/rocketmq-connect-http/README.md         |  5 +-
 .../rocketmq/connect/http/sink/HttpSinkTask.java   |  8 +--
 16 files changed, 127 insertions(+), 134 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/README.md 
b/connectors/aliyun/rocketmq-connect-dingtalk/README.md
index 127036a..2ab5385 100644
--- a/connectors/aliyun/rocketmq-connect-dingtalk/README.md
+++ b/connectors/aliyun/rocketmq-connect-dingtalk/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-ding-talk-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector",“webHook”:"${webHook}",“msgtype”:"${msgtype}","secretKey":"${secretKey}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname"
 : 
"${connect-topicname}","webHook":"${webHook}","msgtype":"${msgtype}","secretKey":"${secretKey}"}
 ```
 
 例子 
 ```
 
http://localhost:8081/connectors/dingTalkConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"}
+"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname"
 : 
"ding-talk-topic","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"}
 ```
 
 >**注:** `rocketmq-ding-talk-connect` 
 >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -40,4 +40,4 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/
 |webHook                 | String  | YES            | 机器人的Webhook地址 | 
https://oapi.dingtalk.com/robot/send?access_token=XXXXXX |
 |msgtype                 | String  | NO             | 消息类型      | text         
                                            |             |
 |secretKey               | String  | NO             | 密钥        | SC           
                                            |
-
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic        
             | xxxx |
diff --git 
a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
 
b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
index cb4e158..98e52fb 100644
--- 
a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
+++ 
b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
@@ -7,7 +7,6 @@ import 
io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +56,6 @@ public class DingTalkSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(DingTalkConstant.WEB_HOOK)) ||
-            
StringUtils.isBlank(config.getString(DingTalkConstant.SECRET_KEY))) {
-            throw new RuntimeException("ding talk required parameter is null 
!");
-        }
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-fc/README.md 
b/connectors/aliyun/rocketmq-connect-fc/README.md
index b22979e..69dc110 100644
--- a/connectors/aliyun/rocketmq-connect-fc/README.md
+++ b/connectors/aliyun/rocketmq-connect-fc/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}",
 "qualifier":"${qualifier}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname"
 : 
"${connect-topicname}",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}",
 "qualifier":"${qualifier}"}
 ```
 
 例子 
 ```
 
http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"",
 "qualifier":"LATEST"}
+"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname"
 : 
"fc-topic",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"",
 "qualifier":"LATEST"}
 ```
 
 >**注:** `rocketmq-fc-connect` 
 >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -45,4 +45,5 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/st
 |functionName            | String  | YES            | 函数名称 | xxxx |
 |invocationType          | String | NO             | 同步或者异步                    
       | null |
 |qualifier               | String | NO             | 服务版本和别名                   
       | LATEST |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic        
             | xxxx |
 
diff --git 
a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
 
b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
index 8080b82..a56739f 100644
--- 
a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
+++ 
b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
@@ -80,14 +80,6 @@ public class FcSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if 
(StringUtils.isBlank(config.getString(FcConstant.REGION_ID_CONSTANT))
-            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_ID_CONSTANT))
-            || 
StringUtils.isBlank(config.getString(FcConstant.ACCESS__KEY_SECRET_CONSTANT))
-            || 
StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
-            || 
StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
-            || 
StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
-            throw new RuntimeException("fc required parameter is null !");
-        }
         try {
             GetServiceRequest getServiceRequest = new 
GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
             
getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
diff --git a/connectors/aliyun/rocketmq-connect-mns/README.md 
b/connectors/aliyun/rocketmq-connect-mns/README.md
index 8c2effd..bc143b6 100644
--- a/connectors/aliyun/rocketmq-connect-mns/README.md
+++ b/connectors/aliyun/rocketmq-connect-mns/README.md
@@ -15,14 +15,14 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-source-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname"
 : 
"${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
 ```
 
 例子
 
 ```
 
http://localhost:8081/connectors/mnsConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
+"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname"
 : 
"mns-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
 "accountId":"xxxx","batchSize":"8","isBase64Decode":"true"}
 ```
 
@@ -46,3 +46,4 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-connector-name}/s
 | accountId       | String  | YES            | 阿里云yourAccountId        | 
10000000 |
 | batchSize       | Integer | NO            | 批量接受消息数量                | 8      
  |
 | isBase64Decode  | String  | NO             | 是否开启Base64解码            | true  
   |
+|connect-topicname       | String  | YES            | source需要处理数据消息topic     
| xxxx |
\ No newline at end of file
diff --git 
a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
 
b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
index ce4a588..d54092c 100644
--- 
a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
+++ 
b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
@@ -13,7 +13,6 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +85,6 @@ public class MNSSourceTask extends SourceTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
-                || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
-                || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
-                || StringUtils.isBlank(config.getString(QUEUE_NAME))
-                || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
-            throw new RuntimeException("mns required parameter is null !");
-        }
         // 检测队列名称是否存在
         PagingListResult<QueueMeta> queueMetaPagingListResult = 
mnsClient.listQueue(queueName, null, 1);
         List<QueueMeta> result = queueMetaPagingListResult.getResult();
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/README.md 
b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
index cb41006..025a044 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/README.md
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
@@ -11,14 +11,14 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-source-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname"
 : 
"${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"}
 ```
 
 例子
 
 ```
 
http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname"
 : 
"rocketmq-source-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
 "instanceId":"xxxx", "consumerGroup":"xxxx"}
 ```
 
@@ -26,13 +26,13 @@ 
http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketm
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname"
 : "${connect-topicname}", "accessKeyId":"${accessKeyId}", 
"accessKeySecret":"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"}
 ```
 
 例子 
 ```
 
http://localhost:8081/connectors/rocketmqConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname"
 : 
"rocketmq-sink-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic";,
 "instanceId":"xxxx"}
 ```
 
@@ -47,14 +47,15 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na
 ## rocketmq-connect-rocketmq 参数说明
 * **rocketmq-source-connector 参数说明**
 
-|         KEY            |  TYPE   | Must be filled | Description| Example
-|------------------------|---------|----------------|------------|---|
-| accessKeyId           | String  | YES            | AccessKey 
ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
-| accessKeySecret       | String  | YES            | AccessKey 
Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+|         KEY            |  TYPE   | Must be filled | Description              
                  | Example
+|------------------------|---------|----------------|--------------------------------------------|---|
+| accessKeyId           | String  | YES            | AccessKey 
ID阿里云身份验证,在阿里云服务器管理控制台创建         | xxxx    |
+| accessKeySecret       | String  | YES            | AccessKey 
Secret阿里云身份验证,在阿里云服务器管理控制台创建     | xxxx    |
 | namesrvAddr           | String  | YES            | 
设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
-| topic                 | String  | YES            | 消息主题          | xxxx    |
-| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
-| consumerGroup            | String  | YES            | 消息订阅者 | xxxx    |
+| topic                 | String  | YES            | 消息主题                      
                 | xxxx    |
+| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id             
                 | xxxx    |
+| consumerGroup            | String  | YES            | 消息订阅者                  
                    | xxxx    |
+|connect-topicname       | String  | YES            | source需要处理数据消息topic      
                  | xxxx |
 
 ```  
 注:1. source/sink配置文件说明是以rocketmq-connect-rocketmq为demo,不同source/sink 
connector配置有差异,请以具体sourc/sink connector为准
@@ -68,4 +69,5 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na
 | namesrvAddr           | String  | YES            | 
设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
 | topic                 | String  | YES            | 消息主题          | xxxx    |
 | instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic        
             | xxxx |
 
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
index 790c6dd..0d13598 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
@@ -1,17 +1,25 @@
 package org.apache.rocketmq.connect.rocketmq;
 
 
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class RocketMQSinkConnector extends SinkConnector {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSinkConnector.class);
 
     private String accessKeyId;
 
@@ -59,6 +67,23 @@ public class RocketMQSinkConnector extends SinkConnector {
                 || 
StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
             throw new RuntimeException("rocketmq required parameter is null 
!");
         }
+        try {
+            Config onsConfig = new Config()
+                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
+            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
     }
 
     @Override
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
index 6c8eb36..13f2623 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
@@ -1,22 +1,18 @@
 package org.apache.rocketmq.connect.rocketmq;
 
-import com.aliyun.ons20190214.Client;
-import com.aliyun.ons20190214.models.OnsTopicListRequest;
-import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.ons.api.Message;
 import com.aliyun.openservices.ons.api.ONSFactory;
 import com.aliyun.openservices.ons.api.Producer;
 import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.ons.api.SendResult;
 import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
-import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
-import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,14 +41,15 @@ public class RocketMQSinkTask extends SinkTask {
             sinkRecords.forEach(connectRecord -> {
                 Message message = new Message();
                 
message.setBody(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
-                // TODO message.setKey();
-                // TODO message.setTag();
                 final KeyValue extensions = connectRecord.getExtensions();
                 if (extensions != null) {
+                    message.setKey(extensions.getString(RocketMQConstant.KEY));
+                    message.setTag(extensions.getString(RocketMQConstant.TAG));
                     extensions.keySet().forEach(key -> 
message.putUserProperties(key, extensions.getString(key)));
                 }
                 message.setTopic(topic);
-                producer.send(message);
+                final SendResult send = producer.send(message);
+                log.info("RocketMQSinkTask | put | send : {}", send);
             });
         } catch (Exception e) {
             log.error("RocketMQSinkTask | put | error => ", e);
@@ -72,30 +69,6 @@ public class RocketMQSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-            || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
-            || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
-            || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
-            throw new RuntimeException("rocketmq required parameter is null 
!");
-        }
-        // 检查topic是否存在
-        try {
-            Config onsConfig = new Config()
-                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
-            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
-            final Client client = new Client(onsConfig);
-            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
-                    .setTopic(config.getString(RocketMQConstant.TOPIC))
-                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
-            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
-            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
-            }
-        } catch (Exception e) {
-            log.error("RocketMQSinkTask | validate | error => ", e);
-            throw new RuntimeException(e.getMessage());
-        }
     }
 
     @Override
@@ -118,7 +91,7 @@ public class RocketMQSinkTask extends SinkTask {
             properties.put(PropertyKeyConst.AccessKey, accessKeyId);
             properties.put(PropertyKeyConst.SecretKey, accessKeySecret);
             if (StringUtils.isNotBlank(instanceId)) {
-                properties.put(PropertyKeyConst.INSTANCE_ID,  instanceId);
+                properties.put(PropertyKeyConst.INSTANCE_ID, instanceId);
             }
             properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
             producer = ONSFactory.createProducer(properties);
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
index 778aa2b..3197837 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
@@ -1,16 +1,26 @@
 package org.apache.rocketmq.connect.rocketmq;
 
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsGroupListRequest;
+import com.aliyun.ons20190214.models.OnsGroupListResponse;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class RocketMQSourceConnector extends SourceConnector {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSourceConnector.class);
 
     private String accessKeyId;
 
@@ -62,6 +72,30 @@ public class RocketMQSourceConnector extends SourceConnector 
{
                 || 
StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
             throw new RuntimeException("rocketmq required parameter is null 
!");
         }
+        try {
+            Config onsConfig = new Config()
+                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
+            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
+            }
+            OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest()
+                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID))
+                    
.setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP));
+            final OnsGroupListResponse onsGroupListResponse = 
client.onsGroupList(onsGroupListRequest);
+            if 
(onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter 
consumerGroup does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
     }
 
     @Override
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
index 9de3ace..cf8def1 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
@@ -1,17 +1,11 @@
 package org.apache.rocketmq.connect.rocketmq;
 
-import com.aliyun.ons20190214.Client;
-import com.aliyun.ons20190214.models.OnsGroupListRequest;
-import com.aliyun.ons20190214.models.OnsGroupListResponse;
-import com.aliyun.ons20190214.models.OnsTopicListRequest;
-import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.ons.api.Action;
 import com.aliyun.openservices.ons.api.Consumer;
 import com.aliyun.openservices.ons.api.ONSFactory;
 import com.aliyun.openservices.ons.api.PropertyKeyConst;
 import com.aliyun.openservices.shade.com.google.common.collect.Maps;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
-import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
@@ -19,7 +13,6 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
-import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
 import org.assertj.core.util.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,9 +49,6 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public List<ConnectRecord> poll() throws InterruptedException {
-        if (consumer == null) {
-            initConsumer();
-        }
         List<ConnectRecord> connectRecords = Lists.newArrayList();
         blockingQueue.drainTo(connectRecords, BATCH_POLL_SIZE);
         return connectRecords;
@@ -76,38 +66,6 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public void validate(KeyValue config) {
-        if 
(StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                || 
StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
-                || 
StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
-                || 
StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))
-                || 
StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
-            throw new RuntimeException("rocketmq required parameter is null 
!");
-        }
-        // 检查topic和consumer group是否存在
-        try {
-            Config onsConfig = new Config()
-                    
.setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                    
.setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
-            onsConfig.endpoint = 
OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
-            final Client client = new Client(onsConfig);
-            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
-                    .setTopic(config.getString(RocketMQConstant.TOPIC))
-                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
-            final OnsTopicListResponse onsTopicListResponse = 
client.onsTopicList(onsTopicListRequest);
-            if 
(onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter topic 
does not exist !");
-            }
-            OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest()
-                    
.setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID))
-                    
.setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP));
-            final OnsGroupListResponse onsGroupListResponse = 
client.onsGroupList(onsGroupListRequest);
-            if 
(onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter 
consumerGroup does not exist !");
-            }
-        } catch (Exception e) {
-            log.error("RocketMQSinkTask | validate | error => ", e);
-            throw new RuntimeException(e.getMessage());
-        }
     }
 
     @Override
@@ -122,17 +80,10 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public void start(SourceTaskContext sourceTaskContext) {
-        try {
-            super.start(sourceTaskContext);
-            initConsumer();
-            consumer.start();
-        } catch (Exception e) {
-            log.error("RocketMQSourceTask | start | error => ", e);
-            throw e;
-        }
+        super.start(sourceTaskContext);
     }
 
-    private void initConsumer() {
+    private void initConsumer(String tag) {
         try {
             Properties properties = new Properties();
             properties.put(PropertyKeyConst.GROUP_ID, consumerGroup);
@@ -143,9 +94,9 @@ public class RocketMQSourceTask extends SourceTask {
                 properties.put(PropertyKeyConst.INSTANCE_ID, instanceId);
             }
             consumer = ONSFactory.createConsumer(properties);
-            // TODO TAG先忽略
-            consumer.subscribe(topic, "*", (message, consumeContext) -> {
+            consumer.subscribe(topic, tag, (message, consumeContext) -> {
                 try {
+                    log.info("RocketMQSourceTask | commit | initConsumer | 
message  : {}", message);
                     Map<String, String> sourceRecordPartition = 
Maps.newHashMap();
                     sourceRecordPartition.put("topic", message.getTopic());
                     sourceRecordPartition.put("brokerName", 
message.getBornHost());
@@ -174,6 +125,19 @@ public class RocketMQSourceTask extends SourceTask {
         }
     }
 
+    @Override
+    public void commit(List<ConnectRecord> connectRecords) throws 
InterruptedException {
+        try {
+            if (connectRecords.isEmpty()) return;
+            final ConnectRecord connectRecord = connectRecords.get(0);
+            initConsumer(connectRecord.getExtension(RocketMQConstant.TAG));
+            consumer.start();
+        } catch (Exception e) {
+            log.error("RocketMQSourceTask | commit | error => ", e);
+            throw e;
+        }
+    }
+
     @Override
     public void stop() {
         consumer.shutdown();
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
index cb1f352..529fd0f 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
@@ -14,4 +14,8 @@ public class RocketMQConstant {
 
     public static final String CONSUMER_GROUP = "consumerGroup";
 
+    public static final String KEY = "KEYS";
+
+    public static final String TAG = "TAGS";
+
 }
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
index 897bb09..c05b19b 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
@@ -75,7 +75,8 @@ public class RocketMQSinkConnectorTest {
         ConnectRecord connectRecord = new ConnectRecord(new 
RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), 
System.currentTimeMillis());
         connectRecord.setData("test message");
         connectRecords.add(connectRecord);
-        connectRecord.addExtension("key", "value");
+        connectRecord.addExtension(RocketMQConstant.KEY, "value");
+        connectRecord.addExtension(RocketMQConstant.TAG, "tag");
         rocketMQSinkTask.put(connectRecords);
     }
 
diff --git 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
index bbdac55..cd0f2e2 100644
--- 
a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
+++ 
b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
@@ -2,12 +2,19 @@ package org.apache.rocketmq.connect.rocketmq;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 public class RocketMQSourceConnectorTest {
 
     @Test
@@ -44,6 +51,11 @@ public class RocketMQSourceConnectorTest {
                 return null;
             }
         });
+        List<ConnectRecord> connectRecords = new ArrayList<>(11);
+        ConnectRecord connectRecord = new ConnectRecord(new 
RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), 
System.currentTimeMillis());
+        connectRecord.addExtension(RocketMQConstant.TAG, "*");
+        connectRecords.add(connectRecord);
+        rocketMQSourceTask.commit(connectRecords);
         rocketMQSourceTask.poll();
         Thread.sleep(50000);
     }
diff --git a/connectors/rocketmq-connect-http/README.md 
b/connectors/rocketmq-connect-http/README.md
index 67b4bb8..8b6d5c7 100644
--- a/connectors/rocketmq-connect-http/README.md
+++ b/connectors/rocketmq-connect-http/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector",“url”:"${url}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname"
 : "${connect-topicname}","url":"${url}"}
 ```
 
 例子 
 ```
 
http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster";,
-"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","url":"192.168.1.2"}
+"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname"
 : "http-topic","url":"192.168.1.2"}
 ```
 
 >**注:** `rocketmq-http-connect` 
 >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -38,4 +38,5 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
 | KEY |  TYPE   | Must be filled | Description | Example          
 |-----|---------|----------------|-------------|------------------|
 | url | String  | YES            | sink端 域名地址  | http://127.0.0.1 |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic        
             | xxxx |
 
diff --git 
a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
 
b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
index 679653e..603bafa 100644
--- 
a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
+++ 
b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
@@ -1,13 +1,12 @@
 package org.apache.rocketmq.connect.http.sink;
 
-import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
-import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,9 +42,6 @@ public class HttpSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) {
-            throw new RuntimeException("http required parameter is null !");
-        }
     }
 
     @Override

Reply via email to