This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 23ca9597 [ISSUE #413]Optimize debezium RocketMqDatabaseHistory  (#415)
23ca9597 is described below

commit 23ca9597848e3294722f6fee4bf0aebd5211c0c1
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Jan 31 20:02:05 2023 +0800

    [ISSUE #413]Optimize debezium RocketMqDatabaseHistory  (#415)
    
    * fixed null pointer exception #294
    
    * Fix invalid offset submitted by sinktask #310
    
    * RC-413 Optimize debezium RocketMqDatabaseHistory
---
 ...etMQConnectUtil.java => RocketMqAdminUtil.java} | 153 +++++++--------
 .../rocketmq/connect/debezium/RocketMqConfig.java  | 133 +++++++++++++
 .../connect/debezium/RocketMqConnectConfig.java    | 217 ---------------------
 .../connect/debezium/RocketMqDatabaseHistory.java  | 213 ++++++++++++++------
 .../connect/debezium/ZeroMessageQueueSelector.java |  35 ++++
 .../mysql/signal/RocketMqSignalThread.java         |  20 +-
 .../runtime/controller/isolation/PluginUtils.java  |   2 +-
 .../connect/runtime/rest/RestHandlerTest.java      |   2 +-
 8 files changed, 410 insertions(+), 365 deletions(-)

diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
similarity index 63%
rename from 
connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
rename to 
connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
index 29d054a0..c1ae1cbe 100644
--- 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
+++ 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
@@ -14,55 +14,45 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package org.apache.rocketmq.connect.debezium;
 
-import com.beust.jcommander.internal.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
-
 /**
- * rocket connect util
+ * Tools for creating RocketMq topic and group
  */
-public class RocketMQConnectUtil {
-
-    public static String createGroupName(String prefix) {
-        StringBuilder sb = new StringBuilder();
-        sb.append(prefix).append("-");
-        sb.append(RemotingUtil.getLocalAddress()).append("-");
-        sb.append(UtilAll.getPid()).append("-");
-        sb.append(System.nanoTime());
-        return sb.toString().replace(".", "-");
-    }
+public class RocketMqAdminUtil {
 
     public static String createUniqInstance(String prefix) {
-        return new 
StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
+        return prefix.concat("-").concat(UUID.randomUUID().toString());
     }
 
     public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
@@ -72,68 +62,36 @@ public class RocketMQConnectUtil {
     /**
      * init default lite pull consumer
      *
-     * @param connectConfig
-     * @param autoCommit
-     * @return
-     * @throws MQClientException
-     */
-    public static DefaultLitePullConsumer 
initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, boolean 
autoCommit) throws MQClientException {
-        DefaultLitePullConsumer consumer = null;
-        if (Objects.isNull(consumer)) {
-            if (StringUtils.isBlank(connectConfig.getAccessKey()) && 
StringUtils.isBlank(connectConfig.getSecretKey())) {
-                consumer = new DefaultLitePullConsumer(
-                        connectConfig.getRmqConsumerGroup()
-                );
-            } else {
-                consumer = new DefaultLitePullConsumer(
-                        connectConfig.getRmqConsumerGroup(),
-                        getAclRPCHook(connectConfig.getAccessKey(), 
connectConfig.getSecretKey())
-                );
-            }
-        }
-        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        String uniqueName = Thread.currentThread().getName() + "-" + 
System.currentTimeMillis() % 1000;
-        consumer.setInstanceName(uniqueName);
-        consumer.setUnitName(uniqueName);
-        consumer.setAutoCommit(autoCommit);
-        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-        return consumer;
-    }
-
-    /**
-     * init default lite pull consumer
-     *
-     * @param connectConfig
-     * @param topic
+     * @param config
      * @param autoCommit
      * @return
      * @throws MQClientException
      */
-    public static DefaultLitePullConsumer 
initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, String topic, 
boolean autoCommit) throws MQClientException {
+    public static DefaultLitePullConsumer 
initDefaultLitePullConsumer(RocketMqConfig config,
+                                                                      boolean 
autoCommit) throws MQClientException {
         DefaultLitePullConsumer consumer = null;
         if (Objects.isNull(consumer)) {
-            if (StringUtils.isBlank(connectConfig.getAccessKey()) && 
StringUtils.isBlank(connectConfig.getSecretKey())) {
+            if (StringUtils.isBlank(config.getAccessKey()) && 
StringUtils.isBlank(config.getSecretKey())) {
                 consumer = new DefaultLitePullConsumer(
-                        connectConfig.getRmqConsumerGroup()
+                        config.getGroupId()
                 );
             } else {
                 consumer = new DefaultLitePullConsumer(
-                        connectConfig.getRmqConsumerGroup(),
-                        getAclRPCHook(connectConfig.getAccessKey(), 
connectConfig.getSecretKey())
+                        config.getGroupId(),
+                        getAclRPCHook(config.getAccessKey(), 
config.getSecretKey())
                 );
             }
         }
-        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        String uniqueName = Thread.currentThread().getName() + "-" + 
System.currentTimeMillis() % 1000;
+        consumer.setNamesrvAddr(config.getNamesrvAddr());
+        String uniqueName = createUniqInstance(config.getNamesrvAddr());
         consumer.setInstanceName(uniqueName);
         consumer.setUnitName(uniqueName);
         consumer.setAutoCommit(autoCommit);
         
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-        consumer.subscribe(topic, "*");
         return consumer;
     }
 
-    public static DefaultMQProducer 
initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
+    public static DefaultMQProducer initDefaultMQProducer(RocketMqConfig 
connectConfig) {
         RPCHook rpcHook = null;
         if (connectConfig.isAclEnable()) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
@@ -141,31 +99,36 @@ public class RocketMQConnectUtil {
         DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
         producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
         
producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
-        producer.setProducerGroup(connectConfig.getRmqConsumerGroup());
-        producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
+        producer.setProducerGroup(connectConfig.getGroupId());
         producer.setLanguage(LanguageCode.JAVA);
         return producer;
     }
 
-    public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig 
connectConfig) throws MQClientException {
+    private static DefaultMQAdminExt startMQAdminTool(RocketMqConfig config) 
throws MQClientException {
         DefaultMQAdminExt admin;
-        if (connectConfig.isAclEnable()) {
-            admin = new DefaultMQAdminExt(new AclClientRPCHook(new 
SessionCredentials(connectConfig.getAccessKey(), 
connectConfig.getSecretKey())));
+        if (config.isAclEnable()) {
+            admin = new DefaultMQAdminExt(new AclClientRPCHook(new 
SessionCredentials(config.getAccessKey(), config.getSecretKey())));
         } else {
             admin = new DefaultMQAdminExt();
         }
-        admin.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        admin.setAdminExtGroup(connectConfig.getRmqConsumerGroup());
-        
admin.setInstanceName(RocketMQConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+        admin.setNamesrvAddr(config.getNamesrvAddr());
+        admin.setAdminExtGroup(config.getGroupId());
+        admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
         admin.start();
         return admin;
     }
 
 
-    public static void createTopic(RocketMqConnectConfig connectConfig, 
TopicConfig topicConfig) {
+    /**
+     * Create rocketMq topic
+     *
+     * @param config
+     * @param topicConfig
+     */
+    public static void createTopic(RocketMqConfig config, TopicConfig 
topicConfig) {
         DefaultMQAdminExt defaultMQAdminExt = null;
         try {
-            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            defaultMQAdminExt = startMQAdminTool(config);
             ClusterInfo clusterInfo = 
defaultMQAdminExt.examineBrokerClusterInfo();
             HashMap<String, Set<String>> clusterAddrTable = 
clusterInfo.getClusterAddrTable();
             Set<String> clusterNameSet = clusterAddrTable.keySet();
@@ -176,7 +139,8 @@ public class RocketMQConnectUtil {
                 }
             }
         } catch (Exception e) {
-            throw new RuntimeException("create topic: " + 
topicConfig.getTopicName() + " failed", e);
+            throw new RuntimeException("RocketMq create schema history topic: 
" + topicConfig.getTopicName() + " " +
+                    " failed", e);
         } finally {
             if (defaultMQAdminExt != null) {
                 defaultMQAdminExt.shutdown();
@@ -184,11 +148,18 @@ public class RocketMQConnectUtil {
         }
     }
 
-    public static boolean topicExist(RocketMqConnectConfig connectConfig, 
String topic) {
+    /**
+     * check topic exist
+     *
+     * @param config
+     * @param topic
+     * @return
+     */
+    public static boolean topicExist(RocketMqConfig config, String topic) {
         DefaultMQAdminExt defaultMQAdminExt = null;
         boolean foundTopicRouteInfo = false;
         try {
-            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            defaultMQAdminExt = startMQAdminTool(config);
             TopicRouteData topicRouteData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
             if (topicRouteData != null) {
                 foundTopicRouteInfo = true;
@@ -203,9 +174,8 @@ public class RocketMQConnectUtil {
         return foundTopicRouteInfo;
     }
 
-
-    public static Set<String> fetchAllConsumerGroup(RocketMqConnectConfig 
connectConfig) {
-        Set<String> consumerGroupSet = Sets.newHashSet();
+    public static Set<String> fetchAllConsumerGroup(RocketMqConfig 
connectConfig) {
+        Set<String> consumerGroupSet = new HashSet<>();
         DefaultMQAdminExt defaultMQAdminExt = null;
         try {
             defaultMQAdminExt = startMQAdminTool(connectConfig);
@@ -215,8 +185,7 @@ public class RocketMQConnectUtil {
                 
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
             }
         } catch (Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException("fetch all topic  failed", e);
+            throw new RuntimeException("RocketMq admin fetch all topic 
failed", e);
         } finally {
             if (defaultMQAdminExt != null) {
                 defaultMQAdminExt.shutdown();
@@ -225,7 +194,7 @@ public class RocketMQConnectUtil {
         return consumerGroupSet;
     }
 
-    public static String createSubGroup(RocketMqConnectConfig connectConfig, 
String subGroup) {
+    public static String createSubGroup(RocketMqConfig connectConfig, String 
subGroup) {
         DefaultMQAdminExt defaultMQAdminExt = null;
         try {
             defaultMQAdminExt = startMQAdminTool(connectConfig);
@@ -249,6 +218,28 @@ public class RocketMQConnectUtil {
         }
         return subGroup;
     }
-}
 
+    /**
+     * Get topic offsets
+     *
+     * @param config
+     * @param topic
+     * @return
+     */
+    public static Map<MessageQueue, TopicOffset> offsets(RocketMqConfig 
config, String topic) {
+        // Get db schema topic min and max offset
+        DefaultMQAdminExt adminClient = null;
+        try {
+            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+            TopicStatsTable topicStatsTable = 
adminClient.examineTopicStats(topic);
+            return topicStatsTable.getOffsetTable();
+        } catch (MQClientException | MQBrokerException | RemotingException | 
InterruptedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (adminClient != null) {
+                adminClient.shutdown();
+            }
+        }
+    }
 
+}
diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
new file mode 100644
index 00000000..16d81978
--- /dev/null
+++ 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import java.util.Objects;
+
+/**
+ * Configuration for connecting RocketMq
+ */
+public class RocketMqConfig {
+    private String namesrvAddr;
+
+    private String groupId;
+
+    /**
+     * set acl config
+     **/
+    private boolean aclEnable;
+    private String accessKey;
+    private String secretKey;
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    private RocketMqConfig(String rmqConsumerGroup, String namesrvAddr, 
boolean aclEnable, String accessKey,
+                           String secretKey) {
+        this.groupId = rmqConsumerGroup;
+        this.namesrvAddr = namesrvAddr;
+        this.aclEnable = aclEnable;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public boolean isAclEnable() {
+        return aclEnable;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RocketMqConfig that = (RocketMqConfig) o;
+        return aclEnable == that.aclEnable && Objects.equals(namesrvAddr, 
that.namesrvAddr) && Objects.equals(groupId, that.groupId) && 
Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey, 
that.secretKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(namesrvAddr, groupId, aclEnable, accessKey, 
secretKey);
+    }
+
+    @Override
+    public String toString() {
+        return "RocketMqConfig{" +
+                "namesrvAddr='" + namesrvAddr + '\'' +
+                ", groupId='" + groupId + '\'' +
+                ", aclEnable=" + aclEnable +
+                ", accessKey='" + accessKey + '\'' +
+                ", secretKey='" + secretKey + '\'' +
+                '}';
+    }
+
+    public static class Builder {
+        private String namesrvAddr;
+        private String groupId;
+        /**
+         * set acl config
+         **/
+        private boolean aclEnable;
+        private String accessKey;
+        private String secretKey;
+
+        public Builder namesrvAddr(String namesrvAddr) {
+            this.namesrvAddr = namesrvAddr;
+            return this;
+        }
+
+        public Builder groupId(String groupId) {
+            this.groupId = groupId;
+            return this;
+        }
+
+        public Builder aclEnable(boolean aclEnable) {
+            this.aclEnable = aclEnable;
+            return this;
+        }
+
+        public Builder accessKey(String accessKey) {
+            this.accessKey = accessKey;
+            return this;
+        }
+
+        public Builder secretKey(String secretKey) {
+            this.secretKey = secretKey;
+            return this;
+        }
+
+        public RocketMqConfig build() {
+            return new RocketMqConfig(groupId, namesrvAddr, aclEnable, 
accessKey, secretKey);
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
deleted file mode 100644
index 267f29ec..00000000
--- 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.connect.debezium;
-
-
-import io.debezium.config.Configuration;
-
-import java.util.Objects;
-
-/**
- * rocketmq connect config
- */
-public class RocketMqConnectConfig {
-    private String namesrvAddr;
-
-    private int operationTimeout = 3000;
-
-    private String rmqConsumerGroup;
-
-    private int rmqMaxRedeliveryTimes;
-
-    private int rmqMessageConsumeTimeout = 3000;
-
-    private int rmqMaxConsumeThreadNums = 32;
-
-    private int rmqMinConsumeThreadNums = 1;
-
-
-    /** set acl config **/
-    private boolean aclEnable;
-    private String accessKey;
-    private String secretKey;
-
-
-    public static Builder newBuilder(){
-        return new Builder();
-    }
-
-    public RocketMqConnectConfig() {}
-
-    public RocketMqConnectConfig(String rmqConsumerGroup, String namesrvAddr, 
boolean aclEnable, String accessKey, String secretKey) {
-        this.rmqConsumerGroup = rmqConsumerGroup;
-        this.namesrvAddr = namesrvAddr;
-        this.aclEnable = aclEnable;
-        this.accessKey = accessKey;
-        this.secretKey = secretKey;
-    }
-
-    public RocketMqConnectConfig(Configuration config, String prefixGroupName) 
{
-        this.rmqConsumerGroup = prefixGroupName.concat("-group");
-        // init rocketmq connection
-        this.namesrvAddr = 
config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
-        this.aclEnable = 
config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
-        this.accessKey = 
config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY);
-        this.secretKey = 
config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY);
-    }
-
-    public String getNamesrvAddr() {
-        return namesrvAddr;
-    }
-
-    public void setNamesrvAddr(String namesrvAddr) {
-        this.namesrvAddr = namesrvAddr;
-    }
-
-    public int getOperationTimeout() {
-        return operationTimeout;
-    }
-
-    public void setOperationTimeout(int operationTimeout) {
-        this.operationTimeout = operationTimeout;
-    }
-
-    public String getRmqConsumerGroup() {
-        return rmqConsumerGroup;
-    }
-
-    public void setRmqConsumerGroup(String rmqConsumerGroup) {
-        this.rmqConsumerGroup = rmqConsumerGroup;
-    }
-
-    public int getRmqMaxRedeliveryTimes() {
-        return rmqMaxRedeliveryTimes;
-    }
-
-    public void setRmqMaxRedeliveryTimes(int rmqMaxRedeliveryTimes) {
-        this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
-    }
-
-    public int getRmqMessageConsumeTimeout() {
-        return rmqMessageConsumeTimeout;
-    }
-
-    public void setRmqMessageConsumeTimeout(int rmqMessageConsumeTimeout) {
-        this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
-    }
-
-    public int getRmqMaxConsumeThreadNums() {
-        return rmqMaxConsumeThreadNums;
-    }
-
-    public void setRmqMaxConsumeThreadNums(int rmqMaxConsumeThreadNums) {
-        this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
-    }
-
-    public int getRmqMinConsumeThreadNums() {
-        return rmqMinConsumeThreadNums;
-    }
-
-    public void setRmqMinConsumeThreadNums(int rmqMinConsumeThreadNums) {
-        this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
-    }
-
-    public boolean isAclEnable() {
-        return aclEnable;
-    }
-
-    public void setAclEnable(boolean aclEnable) {
-        this.aclEnable = aclEnable;
-    }
-
-    public String getAccessKey() {
-        return accessKey;
-    }
-
-    public void setAccessKey(String accessKey) {
-        this.accessKey = accessKey;
-    }
-
-    public String getSecretKey() {
-        return secretKey;
-    }
-
-    public void setSecretKey(String secretKey) {
-        this.secretKey = secretKey;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        RocketMqConnectConfig that = (RocketMqConnectConfig) o;
-        return operationTimeout == that.operationTimeout && 
rmqMaxRedeliveryTimes == that.rmqMaxRedeliveryTimes && rmqMessageConsumeTimeout 
== that.rmqMessageConsumeTimeout && rmqMaxConsumeThreadNums == 
that.rmqMaxConsumeThreadNums && rmqMinConsumeThreadNums == 
that.rmqMinConsumeThreadNums && aclEnable == that.aclEnable && 
Objects.equals(namesrvAddr, that.namesrvAddr) && 
Objects.equals(rmqConsumerGroup, that.rmqConsumerGroup) && 
Objects.equals(accessKey, that.accessKey) && Objects.equals [...]
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(namesrvAddr, operationTimeout, rmqConsumerGroup, 
rmqMaxRedeliveryTimes, rmqMessageConsumeTimeout, rmqMaxConsumeThreadNums, 
rmqMinConsumeThreadNums, aclEnable, accessKey, secretKey);
-    }
-
-    @Override
-    public String toString() {
-        return "RocketMqConnectConfig{" +
-                "namesrvAddr='" + namesrvAddr + '\'' +
-                ", operationTimeout=" + operationTimeout +
-                ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
-                ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
-                ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
-                ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
-                ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
-                ", aclEnable=" + aclEnable +
-                ", accessKey='" + accessKey + '\'' +
-                ", secretKey='" + secretKey + '\'' +
-                '}';
-    }
-
-    public static class Builder{
-        private String namesrvAddr;
-        private String rmqConsumerGroup;
-        /** set acl config **/
-        private boolean aclEnable;
-        private String accessKey;
-        private String secretKey;
-
-        public Builder namesrvAddr(String namesrvAddr){
-            this.namesrvAddr = namesrvAddr;
-            return this;
-        }
-
-        public Builder rmqConsumerGroup(String rmqConsumerGroup){
-            this.rmqConsumerGroup = rmqConsumerGroup;
-            return this;
-        }
-
-        public Builder aclEnable(boolean aclEnable){
-            this.aclEnable = aclEnable;
-            return this;
-        }
-
-        public Builder accessKey(String accessKey){
-            this.accessKey = accessKey;
-            return this;
-        }
-        public Builder secretKey(String secretKey){
-            this.secretKey = secretKey;
-            return this;
-        }
-        public  RocketMqConnectConfig build(){
-            return new RocketMqConnectConfig(rmqConsumerGroup, namesrvAddr, 
aclEnable, accessKey, secretKey);
-        }
-    }
-}
diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
index a8fb38d8..7fd7941e 100644
--- 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
+++ 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -28,23 +28,30 @@ import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecordComparator;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import 
org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 
-
 /**
  * rocketmq database history
  */
@@ -86,14 +93,41 @@ public final class RocketMqDatabaseHistory extends 
AbstractDatabaseHistory {
             .withWidth(ConfigDef.Width.LONG)
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("Rocketmq secret key");
+
+    public static final Field RECOVERY_POLL_ATTEMPTS = 
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "recovery.attempts")
+            .withDisplayName("Max attempts to recovery database schema 
history")
+            .withType(ConfigDef.Type.INT)
+            .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
+            .withWidth(ConfigDef.Width.SHORT)
+            .withImportance(ConfigDef.Importance.LOW)
+            .withDescription("The number of attempts in a row that no data are 
returned from RocketMQ before recover " +
+                    "completes. "
+                    + "The maximum amount of time to wait after receiving no 
data is (recovery.attempts) x (recovery.poll.interval.ms).")
+            .withDefault(60)
+            .withValidation(Field::isInteger);
+
+    public static final Field RECOVERY_POLL_INTERVAL_MS = 
Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+                    + "recovery.poll.interval.ms")
+            .withDisplayName("Poll interval during database schema history 
recovery (ms)")
+            .withType(ConfigDef.Type.INT)
+            .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
+            .withWidth(ConfigDef.Width.SHORT)
+            .withImportance(ConfigDef.Importance.LOW)
+            .withDescription("The number of milliseconds to wait while polling 
for persisted data during recovery.")
+            .withDefault(1000)
+            .withValidation(Field::isLong);
+
     private static final Logger log = 
LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+    private static final int MESSAGE_QUEUE = 0;
+    private static final int UNLIMITED_VALUE = -1;
+
     private final DocumentReader reader = DocumentReader.defaultReader();
     private String topicName;
     private String dbHistoryName;
-    private RocketMqConnectConfig connectConfig;
+    private RocketMqConfig rocketMqConfig;
     private DefaultMQProducer producer;
-
-
+    private int maxRecoveryAttempts;
+    private Long pollInterval;
 
     @Override
     public void configure(
@@ -104,29 +138,39 @@ public final class RocketMqDatabaseHistory extends 
AbstractDatabaseHistory {
         super.configure(config, comparator, listener, useCatalogBeforeSchema);
         this.topicName = config.getString(TOPIC);
         this.dbHistoryName = config.getString(DatabaseHistory.NAME, 
UUID.randomUUID().toString());
+        this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
+        this.pollInterval = config.getLong(RECOVERY_POLL_INTERVAL_MS);
         log.info("Configure to store the debezium database history {} to 
rocketmq topic {}",
                 dbHistoryName, topicName);
-        // init config
-        connectConfig = new RocketMqConnectConfig(config, dbHistoryName);
+        // build config
+        this.rocketMqConfig = RocketMqConfig.newBuilder()
+                
.aclEnable(config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE))
+                
.accessKey(config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY))
+                
.secretKey(config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY))
+                
.namesrvAddr(config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR))
+                .groupId(dbHistoryName)
+                .build();
     }
 
     @Override
     public void initializeStorage() {
         super.initializeStorage();
         log.info("try to create history topic: {}!", this.topicName);
-        TopicConfig topicConfig = new TopicConfig(this.topicName);
-        RocketMQConnectUtil.createTopic(connectConfig, topicConfig);
+        TopicConfig topicConfig = new TopicConfig(this.topicName, 1, 1, 6);
+        RocketMqAdminUtil.createTopic(rocketMqConfig, topicConfig);
     }
 
     @Override
     public void start() {
         super.start();
         try {
-            Set<String> consumerGroupSet = 
RocketMQConnectUtil.fetchAllConsumerGroup(connectConfig);
-            if 
(!consumerGroupSet.contains(connectConfig.getRmqConsumerGroup())) {
-                RocketMQConnectUtil.createSubGroup(connectConfig, 
connectConfig.getRmqConsumerGroup());
+            // Check and create group
+            Set<String> consumerGroupSet = 
RocketMqAdminUtil.fetchAllConsumerGroup(rocketMqConfig);
+            if (!consumerGroupSet.contains(rocketMqConfig.getGroupId())) {
+                RocketMqAdminUtil.createSubGroup(rocketMqConfig, 
rocketMqConfig.getGroupId());
             }
-            this.producer = 
RocketMQConnectUtil.initDefaultMQProducer(connectConfig);
+            // Start rocketmq producer
+            this.producer = 
RocketMqAdminUtil.initDefaultMQProducer(rocketMqConfig);
             this.producer.start();
         } catch (MQClientException e) {
             throw new DatabaseHistoryException(e);
@@ -145,8 +189,39 @@ public final class RocketMqDatabaseHistory extends 
AbstractDatabaseHistory {
         }
     }
 
+    @Override
+    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
+        if (this.producer == null) {
+            throw new IllegalStateException("No producer is available. Ensure 
that 'initializeStorage()'"
+                    + " is called before storing database schema history 
records.");
+        }
+
+        log.trace("Storing record into database schema history: {}", record);
+        try {
+            Message message = new Message(this.topicName, 
record.toString().getBytes());
+            producer.send(message, new ZeroMessageQueueSelector(), null, new 
SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    log.debug("Stored record in topic '{}' partition {} at 
offset {} ",
+                            message.getTopic(), sendResult.getMessageQueue(), 
sendResult.getMessageQueue());
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    log.error("Store record into database schema history 
failed : {}", e);
+                }
+            });
+        } catch (InterruptedException e) {
+            log.error("Interrupted before record was written into database 
schema history: {}", record);
+            Thread.currentThread().interrupt();
+            throw new DatabaseHistoryException(e);
+        } catch (MQClientException | RemotingException e) {
+            throw new DatabaseHistoryException(e);
+        }
+    }
+
     /**
-     * recover record
+     * Recover records
      *
      * @param records
      */
@@ -154,29 +229,57 @@ public final class RocketMqDatabaseHistory extends 
AbstractDatabaseHistory {
     protected void recoverRecords(Consumer<HistoryRecord> records) {
         DefaultLitePullConsumer consumer = null;
         try {
-            consumer = 
RocketMQConnectUtil.initDefaultLitePullConsumer(connectConfig, topicName, 
false);
+            consumer = 
RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, false);
             consumer.start();
-            while (true) {
-                List<MessageExt> result = consumer.poll(10000);
-                if (result == null || result.isEmpty()) {
-                    break;
+
+            // Select message queue
+            MessageQueue messageQueue = new 
ZeroMessageQueueSelector().select(new 
ArrayList<>(consumer.fetchMessageQueues(topicName)), null, null);
+            consumer.assign(Collections.singleton(messageQueue));
+            consumer.seekToBegin(messageQueue);
+            // Read all messages in the topic ...
+            long lastProcessedOffset = UNLIMITED_VALUE;
+            Long maxOffset = null;
+            int recoveryAttempts = 0;
+
+            do {
+                if (recoveryAttempts > maxRecoveryAttempts) {
+                    throw new IllegalStateException(
+                            "The database schema history couldn't be 
recovered.");
                 }
-                for (MessageExt message : result) {
-                    HistoryRecord recordObj = new 
HistoryRecord(reader.read(message.getBody()));
-                    log.trace("Recovering database history: {}", recordObj);
-                    if (recordObj == null || !recordObj.isValid()) {
-                        log.warn("Skipping invalid database history record 
'{}'. " +
-                                        "This is often not an issue, but if it 
happens repeatedly please check the '{}' topic.",
-                                recordObj, topicName);
-                    } else {
-                        records.accept(recordObj);
-                        log.trace("Recovered database history: {}", recordObj);
+                // Get db schema history topic end offset
+                maxOffset = getMaxOffsetOfSchemaHistoryTopic(maxOffset, 
messageQueue);
+                log.debug("End offset of database schema history topic is {}", 
maxOffset);
+
+                // Poll record from db schema history topic
+                List<MessageExt> recoveredRecords = 
consumer.poll(pollInterval);
+                int numRecordsProcessed = 0;
+
+                for (MessageExt message : recoveredRecords) {
+                    if (message.getQueueOffset() > lastProcessedOffset) {
+                        HistoryRecord recordObj = new 
HistoryRecord(reader.read(message.getBody()));
+                        log.trace("Recovering database history: {}", 
recordObj);
+                        if (recordObj == null || !recordObj.isValid()) {
+                            log.warn("Skipping invalid database history record 
'{}'. " +
+                                            "This is often not an issue, but 
if it happens repeatedly please check the '{}' topic.",
+                                    recordObj, topicName);
+                        } else {
+                            records.accept(recordObj);
+                            log.trace("Recovered database history: {}", 
recordObj);
+                        }
+                        lastProcessedOffset = message.getQueueOffset();
+                        ++numRecordsProcessed;
                     }
                 }
-            }
-        } catch (MQClientException ce) {
-            throw new DatabaseHistoryException(ce);
-        } catch (IOException e) {
+                if (numRecordsProcessed == 0) {
+                    log.debug("No new records found in the database schema 
history; will retry");
+                    recoveryAttempts++;
+                } else {
+                    log.debug("Processed {} records from database schema 
history", numRecordsProcessed);
+                }
+
+            } while (lastProcessedOffset < maxOffset - 1);
+
+        } catch (MQClientException | MQBrokerException | IOException | 
RemotingException | InterruptedException e) {
             throw new DatabaseHistoryException(e);
         } finally {
             if (consumer != null) {
@@ -185,37 +288,37 @@ public final class RocketMqDatabaseHistory extends 
AbstractDatabaseHistory {
         }
     }
 
-    @Override
-    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
-        log.info(record.toString());
-        if (this.producer == null) {
-            throw new IllegalStateException("No producer is available. Ensure 
that 'start()'"
-                    + " is called before storing database history records.");
-        }
-        if (log.isTraceEnabled()) {
-            log.trace("Storing record into database history: {}", record);
-        }
-        try {
-            Message sourceMessage = new Message();
-            sourceMessage.setTopic(this.topicName);
-            final byte[] messageBody = record.toString().getBytes();
-            sourceMessage.setBody(messageBody);
-            producer.send(sourceMessage);
-        } catch (Exception e) {
-            throw new DatabaseHistoryException(e);
+    private Long getMaxOffsetOfSchemaHistoryTopic(Long previousEndOffset, 
MessageQueue messageQueue) throws MQBrokerException, RemotingException, 
InterruptedException, MQClientException {
+        Map<MessageQueue, TopicOffset> minAndMaxOffsets = 
RocketMqAdminUtil.offsets(this.rocketMqConfig, topicName);
+        Long maxOffset = minAndMaxOffsets.get(messageQueue).getMaxOffset();
+        if (previousEndOffset != null && !previousEndOffset.equals(maxOffset)) 
{
+            log.warn("Detected changed end offset of database schema history 
topic (previous: "
+                    + previousEndOffset + ", current: " + maxOffset
+                    + "). Make sure that the same history topic isn't shared 
by multiple connector instances.");
         }
+        return maxOffset;
     }
 
-
     @Override
     public boolean exists() {
-        return this.storageExists();
+        boolean exists = false;
+        if (this.storageExists()) {
+            Map<MessageQueue, TopicOffset> minAndMaxOffset = 
RocketMqAdminUtil.offsets(this.rocketMqConfig,
+                    topicName);
+            for (MessageQueue messageQueue : minAndMaxOffset.keySet()) {
+                if (MESSAGE_QUEUE == messageQueue.getQueueId()) {
+                    exists =
+                            minAndMaxOffset.get(messageQueue).getMaxOffset() > 
minAndMaxOffset.get(messageQueue).getMinOffset();
+                }
+            }
+        }
+        return exists;
     }
 
     @Override
     public boolean storageExists() {
-        // check topic is exist
-        return RocketMQConnectUtil.topicExist(connectConfig, this.topicName);
+        // Check whether topic exists
+        return RocketMqAdminUtil.topicExist(rocketMqConfig, this.topicName);
     }
 
     @Override
diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
new file mode 100644
index 00000000..f128ed97
--- /dev/null
+++ 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Always select MessageQueue with queue id 0
+ */
+public class ZeroMessageQueueSelector implements MessageQueueSelector {
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object 
arg) {
+        return mqs.stream().filter(messageQueue -> (messageQueue.getQueueId() 
== 0)).collect(Collectors.toSet()).stream().findFirst().get();
+    }
+}
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
index b2f7535b..54750e2c 100644
--- 
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
+++ 
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
@@ -34,8 +34,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connect.debezium.RocketMQConnectUtil;
-import org.apache.rocketmq.connect.debezium.RocketMqConnectConfig;
+import org.apache.rocketmq.connect.debezium.RocketMqAdminUtil;
+import org.apache.rocketmq.connect.debezium.RocketMqConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +100,7 @@ public class RocketMqSignalThread<T extends 
DataCollectionId> {
     private final String accessKey;
     private final String secretKey;
     private final String connectorName;
-    private final RocketMqConnectConfig rocketMqConnectConfig;
+    private final RocketMqConfig rocketMqConfig;
     private final MySqlReadOnlyIncrementalSnapshotChangeEventSource<T> 
eventSource;
     private final DefaultLitePullConsumer signalsConsumer;
 
@@ -119,8 +119,8 @@ public class RocketMqSignalThread<T extends 
DataCollectionId> {
         this.accessKey = signalConfig.getString(ROCKETMQ_ACCESS_KEY);
         this.secretKey = signalConfig.getString(ROCKETMQ_SECRET_KEY);
         this.nameSrvAddrs = signalConfig.getString(NAME_SRV_ADDR);
-        this.rocketMqConnectConfig = RocketMqConnectConfig.newBuilder()
-                .rmqConsumerGroup(connectorName.concat("-signal-group"))
+        this.rocketMqConfig = RocketMqConfig.newBuilder()
+                .groupId(connectorName.concat("-signal-group"))
                 .namesrvAddr(this.nameSrvAddrs)
                 .aclEnable(this.aclEnabled)
                 .accessKey(this.accessKey)
@@ -145,18 +145,18 @@ public class RocketMqSignalThread<T extends 
DataCollectionId> {
 
     private DefaultLitePullConsumer initDefaultLitePullConsumer() throws 
MQClientException {
         // create topic
-        if (!RocketMQConnectUtil.topicExist(this.rocketMqConnectConfig, 
this.topicName)) {
+        if (!RocketMqAdminUtil.topicExist(this.rocketMqConfig, 
this.topicName)) {
             // read queue 1, write queue 1, prem 1
-            RocketMQConnectUtil.createTopic(this.rocketMqConnectConfig, new 
TopicConfig(this.topicName, 1, 1, 6));
+            RocketMqAdminUtil.createTopic(this.rocketMqConfig, new 
TopicConfig(this.topicName, 1, 1, 6));
             LOGGER.info("Create rocketmq signal topic {}", this.topicName);
         }
         String groupName = connectorName.concat("-signal-group");
-        Set<String> groupSet = 
RocketMQConnectUtil.fetchAllConsumerGroup(this.rocketMqConnectConfig);
+        Set<String> groupSet = 
RocketMqAdminUtil.fetchAllConsumerGroup(this.rocketMqConfig);
         if (!groupSet.contains(groupName)) {
             // create consumer group
-            RocketMQConnectUtil.createSubGroup(this.rocketMqConnectConfig, 
rocketMqConnectConfig.getRmqConsumerGroup());
+            RocketMqAdminUtil.createSubGroup(this.rocketMqConfig, 
rocketMqConfig.getGroupId());
         }
-        return 
RocketMQConnectUtil.initDefaultLitePullConsumer(rocketMqConnectConfig, false);
+        return RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, 
false);
 
     }
 
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index 454f354c..a8f11da4 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -119,7 +119,7 @@ public class PluginUtils {
             + "|org\\.xml\\.sax"
             + "|io\\.openmessaging\\.connector\\.api"
             + "|org\\.slf4j"
-            + "|org\\.apache\\.rocketmq\\.client"
+            + "|org\\.apache\\.rocketmq"
             + ")\\..*$"
             + "|io\\.openmessaging\\.KeyValue");
 
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index c22e155f..39050d4d 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -313,4 +313,4 @@ public class RestHandlerTest {
 
     }
 
-}
\ No newline at end of file
+}

Reply via email to