This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 23ca9597 [ISSUE #413]Optimize debezium RocketMqDatabaseHistory (#415)
23ca9597 is described below
commit 23ca9597848e3294722f6fee4bf0aebd5211c0c1
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Jan 31 20:02:05 2023 +0800
[ISSUE #413]Optimize debezium RocketMqDatabaseHistory (#415)
* fixed null pointer exception #294
* Fix invalid offset submitted by sinktask #310
* RC-413 Optimize debezium RocketMqDatabaseHistory
---
...etMQConnectUtil.java => RocketMqAdminUtil.java} | 153 +++++++--------
.../rocketmq/connect/debezium/RocketMqConfig.java | 133 +++++++++++++
.../connect/debezium/RocketMqConnectConfig.java | 217 ---------------------
.../connect/debezium/RocketMqDatabaseHistory.java | 213 ++++++++++++++------
.../connect/debezium/ZeroMessageQueueSelector.java | 35 ++++
.../mysql/signal/RocketMqSignalThread.java | 20 +-
.../runtime/controller/isolation/PluginUtils.java | 2 +-
.../connect/runtime/rest/RestHandlerTest.java | 2 +-
8 files changed, 410 insertions(+), 365 deletions(-)
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
similarity index 63%
rename from
connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
rename to
connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
index 29d054a0..c1ae1cbe 100644
---
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
+++
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
@@ -14,55 +14,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.rocketmq.connect.debezium;
-import com.beust.jcommander.internal.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-
/**
- * rocket connect util
+ * Tools for creating RocketMq topic and group
*/
-public class RocketMQConnectUtil {
-
- public static String createGroupName(String prefix) {
- StringBuilder sb = new StringBuilder();
- sb.append(prefix).append("-");
- sb.append(RemotingUtil.getLocalAddress()).append("-");
- sb.append(UtilAll.getPid()).append("-");
- sb.append(System.nanoTime());
- return sb.toString().replace(".", "-");
- }
+public class RocketMqAdminUtil {
public static String createUniqInstance(String prefix) {
- return new
StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
+ return prefix.concat("-").concat(UUID.randomUUID().toString());
}
public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
@@ -72,68 +62,36 @@ public class RocketMQConnectUtil {
/**
* init default lite pull consumer
*
- * @param connectConfig
- * @param autoCommit
- * @return
- * @throws MQClientException
- */
- public static DefaultLitePullConsumer
initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, boolean
autoCommit) throws MQClientException {
- DefaultLitePullConsumer consumer = null;
- if (Objects.isNull(consumer)) {
- if (StringUtils.isBlank(connectConfig.getAccessKey()) &&
StringUtils.isBlank(connectConfig.getSecretKey())) {
- consumer = new DefaultLitePullConsumer(
- connectConfig.getRmqConsumerGroup()
- );
- } else {
- consumer = new DefaultLitePullConsumer(
- connectConfig.getRmqConsumerGroup(),
- getAclRPCHook(connectConfig.getAccessKey(),
connectConfig.getSecretKey())
- );
- }
- }
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- String uniqueName = Thread.currentThread().getName() + "-" +
System.currentTimeMillis() % 1000;
- consumer.setInstanceName(uniqueName);
- consumer.setUnitName(uniqueName);
- consumer.setAutoCommit(autoCommit);
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- return consumer;
- }
-
- /**
- * init default lite pull consumer
- *
- * @param connectConfig
- * @param topic
+ * @param config
* @param autoCommit
* @return
* @throws MQClientException
*/
- public static DefaultLitePullConsumer
initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, String topic,
boolean autoCommit) throws MQClientException {
+ public static DefaultLitePullConsumer
initDefaultLitePullConsumer(RocketMqConfig config,
+ boolean
autoCommit) throws MQClientException {
DefaultLitePullConsumer consumer = null;
if (Objects.isNull(consumer)) {
- if (StringUtils.isBlank(connectConfig.getAccessKey()) &&
StringUtils.isBlank(connectConfig.getSecretKey())) {
+ if (StringUtils.isBlank(config.getAccessKey()) &&
StringUtils.isBlank(config.getSecretKey())) {
consumer = new DefaultLitePullConsumer(
- connectConfig.getRmqConsumerGroup()
+ config.getGroupId()
);
} else {
consumer = new DefaultLitePullConsumer(
- connectConfig.getRmqConsumerGroup(),
- getAclRPCHook(connectConfig.getAccessKey(),
connectConfig.getSecretKey())
+ config.getGroupId(),
+ getAclRPCHook(config.getAccessKey(),
config.getSecretKey())
);
}
}
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- String uniqueName = Thread.currentThread().getName() + "-" +
System.currentTimeMillis() % 1000;
+ consumer.setNamesrvAddr(config.getNamesrvAddr());
+ String uniqueName = createUniqInstance(config.getNamesrvAddr());
consumer.setInstanceName(uniqueName);
consumer.setUnitName(uniqueName);
consumer.setAutoCommit(autoCommit);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe(topic, "*");
return consumer;
}
- public static DefaultMQProducer
initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
+ public static DefaultMQProducer initDefaultMQProducer(RocketMqConfig
connectConfig) {
RPCHook rpcHook = null;
if (connectConfig.isAclEnable()) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
@@ -141,31 +99,36 @@ public class RocketMQConnectUtil {
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- producer.setProducerGroup(connectConfig.getRmqConsumerGroup());
- producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
+ producer.setProducerGroup(connectConfig.getGroupId());
producer.setLanguage(LanguageCode.JAVA);
return producer;
}
- public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig
connectConfig) throws MQClientException {
+ private static DefaultMQAdminExt startMQAdminTool(RocketMqConfig config)
throws MQClientException {
DefaultMQAdminExt admin;
- if (connectConfig.isAclEnable()) {
- admin = new DefaultMQAdminExt(new AclClientRPCHook(new
SessionCredentials(connectConfig.getAccessKey(),
connectConfig.getSecretKey())));
+ if (config.isAclEnable()) {
+ admin = new DefaultMQAdminExt(new AclClientRPCHook(new
SessionCredentials(config.getAccessKey(), config.getSecretKey())));
} else {
admin = new DefaultMQAdminExt();
}
- admin.setNamesrvAddr(connectConfig.getNamesrvAddr());
- admin.setAdminExtGroup(connectConfig.getRmqConsumerGroup());
-
admin.setInstanceName(RocketMQConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+ admin.setNamesrvAddr(config.getNamesrvAddr());
+ admin.setAdminExtGroup(config.getGroupId());
+ admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
admin.start();
return admin;
}
- public static void createTopic(RocketMqConnectConfig connectConfig,
TopicConfig topicConfig) {
+ /**
+ * Create rocketMq topic
+ *
+ * @param config
+ * @param topicConfig
+ */
+ public static void createTopic(RocketMqConfig config, TopicConfig
topicConfig) {
DefaultMQAdminExt defaultMQAdminExt = null;
try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
+ defaultMQAdminExt = startMQAdminTool(config);
ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable =
clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
@@ -176,7 +139,8 @@ public class RocketMQConnectUtil {
}
}
} catch (Exception e) {
- throw new RuntimeException("create topic: " +
topicConfig.getTopicName() + " failed", e);
+ throw new RuntimeException("RocketMq create schema history topic:
" + topicConfig.getTopicName() + " " +
+ " failed", e);
} finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
@@ -184,11 +148,18 @@ public class RocketMQConnectUtil {
}
}
- public static boolean topicExist(RocketMqConnectConfig connectConfig,
String topic) {
+ /**
+ * check topic exist
+ *
+ * @param config
+ * @param topic
+ * @return
+ */
+ public static boolean topicExist(RocketMqConfig config, String topic) {
DefaultMQAdminExt defaultMQAdminExt = null;
boolean foundTopicRouteInfo = false;
try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
+ defaultMQAdminExt = startMQAdminTool(config);
TopicRouteData topicRouteData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
if (topicRouteData != null) {
foundTopicRouteInfo = true;
@@ -203,9 +174,8 @@ public class RocketMQConnectUtil {
return foundTopicRouteInfo;
}
-
- public static Set<String> fetchAllConsumerGroup(RocketMqConnectConfig
connectConfig) {
- Set<String> consumerGroupSet = Sets.newHashSet();
+ public static Set<String> fetchAllConsumerGroup(RocketMqConfig
connectConfig) {
+ Set<String> consumerGroupSet = new HashSet<>();
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMQAdminTool(connectConfig);
@@ -215,8 +185,7 @@ public class RocketMQConnectUtil {
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
}
} catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException("fetch all topic failed", e);
+ throw new RuntimeException("RocketMq admin fetch all topic
failed", e);
} finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();
@@ -225,7 +194,7 @@ public class RocketMQConnectUtil {
return consumerGroupSet;
}
- public static String createSubGroup(RocketMqConnectConfig connectConfig,
String subGroup) {
+ public static String createSubGroup(RocketMqConfig connectConfig, String
subGroup) {
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMQAdminTool(connectConfig);
@@ -249,6 +218,28 @@ public class RocketMQConnectUtil {
}
return subGroup;
}
-}
+ /**
+ * Get topic offsets
+ *
+ * @param config
+ * @param topic
+ * @return
+ */
+ public static Map<MessageQueue, TopicOffset> offsets(RocketMqConfig
config, String topic) {
+ // Get db schema topic min and max offset
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+ TopicStatsTable topicStatsTable =
adminClient.examineTopicStats(topic);
+ return topicStatsTable.getOffsetTable();
+ } catch (MQClientException | MQBrokerException | RemotingException |
InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
new file mode 100644
index 00000000..16d81978
--- /dev/null
+++
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConfig.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import java.util.Objects;
+
+/**
+ * Configuration for connecting RocketMq
+ */
+public class RocketMqConfig {
+ private String namesrvAddr;
+
+ private String groupId;
+
+ /**
+ * set acl config
+ **/
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private RocketMqConfig(String rmqConsumerGroup, String namesrvAddr,
boolean aclEnable, String accessKey,
+ String secretKey) {
+ this.groupId = rmqConsumerGroup;
+ this.namesrvAddr = namesrvAddr;
+ this.aclEnable = aclEnable;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public boolean isAclEnable() {
+ return aclEnable;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RocketMqConfig that = (RocketMqConfig) o;
+ return aclEnable == that.aclEnable && Objects.equals(namesrvAddr,
that.namesrvAddr) && Objects.equals(groupId, that.groupId) &&
Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey,
that.secretKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(namesrvAddr, groupId, aclEnable, accessKey,
secretKey);
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMqConfig{" +
+ "namesrvAddr='" + namesrvAddr + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", aclEnable=" + aclEnable +
+ ", accessKey='" + accessKey + '\'' +
+ ", secretKey='" + secretKey + '\'' +
+ '}';
+ }
+
+ public static class Builder {
+ private String namesrvAddr;
+ private String groupId;
+ /**
+ * set acl config
+ **/
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+
+ public Builder namesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ return this;
+ }
+
+ public Builder groupId(String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder aclEnable(boolean aclEnable) {
+ this.aclEnable = aclEnable;
+ return this;
+ }
+
+ public Builder accessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ public Builder secretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ public RocketMqConfig build() {
+ return new RocketMqConfig(groupId, namesrvAddr, aclEnable,
accessKey, secretKey);
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
deleted file mode 100644
index 267f29ec..00000000
---
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.debezium;
-
-
-import io.debezium.config.Configuration;
-
-import java.util.Objects;
-
-/**
- * rocketmq connect config
- */
-public class RocketMqConnectConfig {
- private String namesrvAddr;
-
- private int operationTimeout = 3000;
-
- private String rmqConsumerGroup;
-
- private int rmqMaxRedeliveryTimes;
-
- private int rmqMessageConsumeTimeout = 3000;
-
- private int rmqMaxConsumeThreadNums = 32;
-
- private int rmqMinConsumeThreadNums = 1;
-
-
- /** set acl config **/
- private boolean aclEnable;
- private String accessKey;
- private String secretKey;
-
-
- public static Builder newBuilder(){
- return new Builder();
- }
-
- public RocketMqConnectConfig() {}
-
- public RocketMqConnectConfig(String rmqConsumerGroup, String namesrvAddr,
boolean aclEnable, String accessKey, String secretKey) {
- this.rmqConsumerGroup = rmqConsumerGroup;
- this.namesrvAddr = namesrvAddr;
- this.aclEnable = aclEnable;
- this.accessKey = accessKey;
- this.secretKey = secretKey;
- }
-
- public RocketMqConnectConfig(Configuration config, String prefixGroupName)
{
- this.rmqConsumerGroup = prefixGroupName.concat("-group");
- // init rocketmq connection
- this.namesrvAddr =
config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
- this.aclEnable =
config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
- this.accessKey =
config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY);
- this.secretKey =
config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY);
- }
-
- public String getNamesrvAddr() {
- return namesrvAddr;
- }
-
- public void setNamesrvAddr(String namesrvAddr) {
- this.namesrvAddr = namesrvAddr;
- }
-
- public int getOperationTimeout() {
- return operationTimeout;
- }
-
- public void setOperationTimeout(int operationTimeout) {
- this.operationTimeout = operationTimeout;
- }
-
- public String getRmqConsumerGroup() {
- return rmqConsumerGroup;
- }
-
- public void setRmqConsumerGroup(String rmqConsumerGroup) {
- this.rmqConsumerGroup = rmqConsumerGroup;
- }
-
- public int getRmqMaxRedeliveryTimes() {
- return rmqMaxRedeliveryTimes;
- }
-
- public void setRmqMaxRedeliveryTimes(int rmqMaxRedeliveryTimes) {
- this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
- }
-
- public int getRmqMessageConsumeTimeout() {
- return rmqMessageConsumeTimeout;
- }
-
- public void setRmqMessageConsumeTimeout(int rmqMessageConsumeTimeout) {
- this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
- }
-
- public int getRmqMaxConsumeThreadNums() {
- return rmqMaxConsumeThreadNums;
- }
-
- public void setRmqMaxConsumeThreadNums(int rmqMaxConsumeThreadNums) {
- this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
- }
-
- public int getRmqMinConsumeThreadNums() {
- return rmqMinConsumeThreadNums;
- }
-
- public void setRmqMinConsumeThreadNums(int rmqMinConsumeThreadNums) {
- this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
- }
-
- public boolean isAclEnable() {
- return aclEnable;
- }
-
- public void setAclEnable(boolean aclEnable) {
- this.aclEnable = aclEnable;
- }
-
- public String getAccessKey() {
- return accessKey;
- }
-
- public void setAccessKey(String accessKey) {
- this.accessKey = accessKey;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- RocketMqConnectConfig that = (RocketMqConnectConfig) o;
- return operationTimeout == that.operationTimeout &&
rmqMaxRedeliveryTimes == that.rmqMaxRedeliveryTimes && rmqMessageConsumeTimeout
== that.rmqMessageConsumeTimeout && rmqMaxConsumeThreadNums ==
that.rmqMaxConsumeThreadNums && rmqMinConsumeThreadNums ==
that.rmqMinConsumeThreadNums && aclEnable == that.aclEnable &&
Objects.equals(namesrvAddr, that.namesrvAddr) &&
Objects.equals(rmqConsumerGroup, that.rmqConsumerGroup) &&
Objects.equals(accessKey, that.accessKey) && Objects.equals [...]
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(namesrvAddr, operationTimeout, rmqConsumerGroup,
rmqMaxRedeliveryTimes, rmqMessageConsumeTimeout, rmqMaxConsumeThreadNums,
rmqMinConsumeThreadNums, aclEnable, accessKey, secretKey);
- }
-
- @Override
- public String toString() {
- return "RocketMqConnectConfig{" +
- "namesrvAddr='" + namesrvAddr + '\'' +
- ", operationTimeout=" + operationTimeout +
- ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
- ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
- ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
- ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
- ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
- ", aclEnable=" + aclEnable +
- ", accessKey='" + accessKey + '\'' +
- ", secretKey='" + secretKey + '\'' +
- '}';
- }
-
- public static class Builder{
- private String namesrvAddr;
- private String rmqConsumerGroup;
- /** set acl config **/
- private boolean aclEnable;
- private String accessKey;
- private String secretKey;
-
- public Builder namesrvAddr(String namesrvAddr){
- this.namesrvAddr = namesrvAddr;
- return this;
- }
-
- public Builder rmqConsumerGroup(String rmqConsumerGroup){
- this.rmqConsumerGroup = rmqConsumerGroup;
- return this;
- }
-
- public Builder aclEnable(boolean aclEnable){
- this.aclEnable = aclEnable;
- return this;
- }
-
- public Builder accessKey(String accessKey){
- this.accessKey = accessKey;
- return this;
- }
- public Builder secretKey(String secretKey){
- this.secretKey = secretKey;
- return this;
- }
- public RocketMqConnectConfig build(){
- return new RocketMqConnectConfig(rmqConsumerGroup, namesrvAddr,
aclEnable, accessKey, secretKey);
- }
- }
-}
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
index a8fb38d8..7fd7941e 100644
---
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
+++
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -28,23 +28,30 @@ import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import
org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
-
/**
* rocketmq database history
*/
@@ -86,14 +93,41 @@ public final class RocketMqDatabaseHistory extends
AbstractDatabaseHistory {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Rocketmq secret key");
+
+ public static final Field RECOVERY_POLL_ATTEMPTS =
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "recovery.attempts")
+ .withDisplayName("Max attempts to recovery database schema
history")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The number of attempts in a row that no data are
returned from RocketMQ before recover " +
+ "completes. "
+ + "The maximum amount of time to wait after receiving no
data is (recovery.attempts) x (recovery.poll.interval.ms).")
+ .withDefault(60)
+ .withValidation(Field::isInteger);
+
+ public static final Field RECOVERY_POLL_INTERVAL_MS =
Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ + "recovery.poll.interval.ms")
+ .withDisplayName("Poll interval during database schema history
recovery (ms)")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The number of milliseconds to wait while polling
for persisted data during recovery.")
+ .withDefault(1000)
+ .withValidation(Field::isLong);
+
private static final Logger log =
LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+ private static final int MESSAGE_QUEUE = 0;
+ private static final int UNLIMITED_VALUE = -1;
+
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private String dbHistoryName;
- private RocketMqConnectConfig connectConfig;
+ private RocketMqConfig rocketMqConfig;
private DefaultMQProducer producer;
-
-
+ private int maxRecoveryAttempts;
+ private Long pollInterval;
@Override
public void configure(
@@ -104,29 +138,39 @@ public final class RocketMqDatabaseHistory extends
AbstractDatabaseHistory {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.topicName = config.getString(TOPIC);
this.dbHistoryName = config.getString(DatabaseHistory.NAME,
UUID.randomUUID().toString());
+ this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
+ this.pollInterval = config.getLong(RECOVERY_POLL_INTERVAL_MS);
log.info("Configure to store the debezium database history {} to
rocketmq topic {}",
dbHistoryName, topicName);
- // init config
- connectConfig = new RocketMqConnectConfig(config, dbHistoryName);
+ // build config
+ this.rocketMqConfig = RocketMqConfig.newBuilder()
+
.aclEnable(config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE))
+
.accessKey(config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY))
+
.secretKey(config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY))
+
.namesrvAddr(config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR))
+ .groupId(dbHistoryName)
+ .build();
}
@Override
public void initializeStorage() {
super.initializeStorage();
log.info("try to create history topic: {}!", this.topicName);
- TopicConfig topicConfig = new TopicConfig(this.topicName);
- RocketMQConnectUtil.createTopic(connectConfig, topicConfig);
+ TopicConfig topicConfig = new TopicConfig(this.topicName, 1, 1, 6);
+ RocketMqAdminUtil.createTopic(rocketMqConfig, topicConfig);
}
@Override
public void start() {
super.start();
try {
- Set<String> consumerGroupSet =
RocketMQConnectUtil.fetchAllConsumerGroup(connectConfig);
- if
(!consumerGroupSet.contains(connectConfig.getRmqConsumerGroup())) {
- RocketMQConnectUtil.createSubGroup(connectConfig,
connectConfig.getRmqConsumerGroup());
+ // Check and create group
+ Set<String> consumerGroupSet =
RocketMqAdminUtil.fetchAllConsumerGroup(rocketMqConfig);
+ if (!consumerGroupSet.contains(rocketMqConfig.getGroupId())) {
+ RocketMqAdminUtil.createSubGroup(rocketMqConfig,
rocketMqConfig.getGroupId());
}
- this.producer =
RocketMQConnectUtil.initDefaultMQProducer(connectConfig);
+ // Start rocketmq producer
+ this.producer =
RocketMqAdminUtil.initDefaultMQProducer(rocketMqConfig);
this.producer.start();
} catch (MQClientException e) {
throw new DatabaseHistoryException(e);
@@ -145,8 +189,39 @@ public final class RocketMqDatabaseHistory extends
AbstractDatabaseHistory {
}
}
+ @Override
+ protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
+ if (this.producer == null) {
+ throw new IllegalStateException("No producer is available. Ensure
that 'initializeStorage()'"
+ + " is called before storing database schema history
records.");
+ }
+
+ log.trace("Storing record into database schema history: {}", record);
+ try {
+ Message message = new Message(this.topicName,
record.toString().getBytes());
+ producer.send(message, new ZeroMessageQueueSelector(), null, new
SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ log.debug("Stored record in topic '{}' partition {} at
offset {} ",
+ message.getTopic(), sendResult.getMessageQueue(),
sendResult.getMessageQueue());
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("Store record into database schema history
failed : {}", e);
+ }
+ });
+ } catch (InterruptedException e) {
+ log.error("Interrupted before record was written into database
schema history: {}", record);
+ Thread.currentThread().interrupt();
+ throw new DatabaseHistoryException(e);
+ } catch (MQClientException | RemotingException e) {
+ throw new DatabaseHistoryException(e);
+ }
+ }
+
/**
- * recover record
+ * Recover records
*
* @param records
*/
@@ -154,29 +229,57 @@ public final class RocketMqDatabaseHistory extends
AbstractDatabaseHistory {
protected void recoverRecords(Consumer<HistoryRecord> records) {
DefaultLitePullConsumer consumer = null;
try {
- consumer =
RocketMQConnectUtil.initDefaultLitePullConsumer(connectConfig, topicName,
false);
+ consumer =
RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig, false);
consumer.start();
- while (true) {
- List<MessageExt> result = consumer.poll(10000);
- if (result == null || result.isEmpty()) {
- break;
+
+ // Select message queue
+ MessageQueue messageQueue = new
ZeroMessageQueueSelector().select(new
ArrayList<>(consumer.fetchMessageQueues(topicName)), null, null);
+ consumer.assign(Collections.singleton(messageQueue));
+ consumer.seekToBegin(messageQueue);
+ // Read all messages in the topic ...
+ long lastProcessedOffset = UNLIMITED_VALUE;
+ Long maxOffset = null;
+ int recoveryAttempts = 0;
+
+ do {
+ if (recoveryAttempts > maxRecoveryAttempts) {
+ throw new IllegalStateException(
+ "The database schema history couldn't be
recovered.");
}
- for (MessageExt message : result) {
- HistoryRecord recordObj = new
HistoryRecord(reader.read(message.getBody()));
- log.trace("Recovering database history: {}", recordObj);
- if (recordObj == null || !recordObj.isValid()) {
- log.warn("Skipping invalid database history record
'{}'. " +
- "This is often not an issue, but if it
happens repeatedly please check the '{}' topic.",
- recordObj, topicName);
- } else {
- records.accept(recordObj);
- log.trace("Recovered database history: {}", recordObj);
+ // Get db schema history topic end offset
+ maxOffset = getMaxOffsetOfSchemaHistoryTopic(maxOffset,
messageQueue);
+ log.debug("End offset of database schema history topic is {}",
maxOffset);
+
+ // Poll record from db schema history topic
+ List<MessageExt> recoveredRecords =
consumer.poll(pollInterval);
+ int numRecordsProcessed = 0;
+
+ for (MessageExt message : recoveredRecords) {
+ if (message.getQueueOffset() > lastProcessedOffset) {
+ HistoryRecord recordObj = new
HistoryRecord(reader.read(message.getBody()));
+ log.trace("Recovering database history: {}",
recordObj);
+ if (recordObj == null || !recordObj.isValid()) {
+ log.warn("Skipping invalid database history record
'{}'. " +
+ "This is often not an issue, but
if it happens repeatedly please check the '{}' topic.",
+ recordObj, topicName);
+ } else {
+ records.accept(recordObj);
+ log.trace("Recovered database history: {}",
recordObj);
+ }
+ lastProcessedOffset = message.getQueueOffset();
+ ++numRecordsProcessed;
}
}
- }
- } catch (MQClientException ce) {
- throw new DatabaseHistoryException(ce);
- } catch (IOException e) {
+ if (numRecordsProcessed == 0) {
+ log.debug("No new records found in the database schema
history; will retry");
+ recoveryAttempts++;
+ } else {
+ log.debug("Processed {} records from database schema
history", numRecordsProcessed);
+ }
+
+ } while (lastProcessedOffset < maxOffset - 1);
+
+ } catch (MQClientException | MQBrokerException | IOException |
RemotingException | InterruptedException e) {
throw new DatabaseHistoryException(e);
} finally {
if (consumer != null) {
@@ -185,37 +288,37 @@ public final class RocketMqDatabaseHistory extends
AbstractDatabaseHistory {
}
}
- @Override
- protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
- log.info(record.toString());
- if (this.producer == null) {
- throw new IllegalStateException("No producer is available. Ensure
that 'start()'"
- + " is called before storing database history records.");
- }
- if (log.isTraceEnabled()) {
- log.trace("Storing record into database history: {}", record);
- }
- try {
- Message sourceMessage = new Message();
- sourceMessage.setTopic(this.topicName);
- final byte[] messageBody = record.toString().getBytes();
- sourceMessage.setBody(messageBody);
- producer.send(sourceMessage);
- } catch (Exception e) {
- throw new DatabaseHistoryException(e);
+ private Long getMaxOffsetOfSchemaHistoryTopic(Long previousEndOffset,
MessageQueue messageQueue) throws MQBrokerException, RemotingException,
InterruptedException, MQClientException {
+ Map<MessageQueue, TopicOffset> minAndMaxOffsets =
RocketMqAdminUtil.offsets(this.rocketMqConfig, topicName);
+ Long maxOffset = minAndMaxOffsets.get(messageQueue).getMaxOffset();
+ if (previousEndOffset != null && !previousEndOffset.equals(maxOffset))
{
+ log.warn("Detected changed end offset of database schema history
topic (previous: "
+ + previousEndOffset + ", current: " + maxOffset
+ + "). Make sure that the same history topic isn't shared
by multiple connector instances.");
}
+ return maxOffset;
}
-
@Override
public boolean exists() {
- return this.storageExists();
+ boolean exists = false;
+ if (this.storageExists()) {
+ Map<MessageQueue, TopicOffset> minAndMaxOffset =
RocketMqAdminUtil.offsets(this.rocketMqConfig,
+ topicName);
+ for (MessageQueue messageQueue : minAndMaxOffset.keySet()) {
+ if (MESSAGE_QUEUE == messageQueue.getQueueId()) {
+ exists =
+ minAndMaxOffset.get(messageQueue).getMaxOffset() >
minAndMaxOffset.get(messageQueue).getMinOffset();
+ }
+ }
+ }
+ return exists;
}
@Override
public boolean storageExists() {
- // check topic is exist
- return RocketMQConnectUtil.topicExist(connectConfig, this.topicName);
+ // Check whether topic exists
+ return RocketMqAdminUtil.topicExist(rocketMqConfig, this.topicName);
}
@Override
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
new file mode 100644
index 00000000..f128ed97
--- /dev/null
+++
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ZeroMessageQueueSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Always select MessageQueue with queue id 0
+ */
+public class ZeroMessageQueueSelector implements MessageQueueSelector {
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object
arg) {
+ return mqs.stream().filter(messageQueue -> (messageQueue.getQueueId()
== 0)).collect(Collectors.toSet()).stream().findFirst().get();
+ }
+}
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
index b2f7535b..54750e2c 100644
---
a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
+++
b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/io/debezium/connector/mysql/signal/RocketMqSignalThread.java
@@ -34,8 +34,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connect.debezium.RocketMQConnectUtil;
-import org.apache.rocketmq.connect.debezium.RocketMqConnectConfig;
+import org.apache.rocketmq.connect.debezium.RocketMqAdminUtil;
+import org.apache.rocketmq.connect.debezium.RocketMqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +100,7 @@ public class RocketMqSignalThread<T extends
DataCollectionId> {
private final String accessKey;
private final String secretKey;
private final String connectorName;
- private final RocketMqConnectConfig rocketMqConnectConfig;
+ private final RocketMqConfig rocketMqConfig;
private final MySqlReadOnlyIncrementalSnapshotChangeEventSource<T>
eventSource;
private final DefaultLitePullConsumer signalsConsumer;
@@ -119,8 +119,8 @@ public class RocketMqSignalThread<T extends
DataCollectionId> {
this.accessKey = signalConfig.getString(ROCKETMQ_ACCESS_KEY);
this.secretKey = signalConfig.getString(ROCKETMQ_SECRET_KEY);
this.nameSrvAddrs = signalConfig.getString(NAME_SRV_ADDR);
- this.rocketMqConnectConfig = RocketMqConnectConfig.newBuilder()
- .rmqConsumerGroup(connectorName.concat("-signal-group"))
+ this.rocketMqConfig = RocketMqConfig.newBuilder()
+ .groupId(connectorName.concat("-signal-group"))
.namesrvAddr(this.nameSrvAddrs)
.aclEnable(this.aclEnabled)
.accessKey(this.accessKey)
@@ -145,18 +145,18 @@ public class RocketMqSignalThread<T extends
DataCollectionId> {
private DefaultLitePullConsumer initDefaultLitePullConsumer() throws
MQClientException {
// create topic
- if (!RocketMQConnectUtil.topicExist(this.rocketMqConnectConfig,
this.topicName)) {
+ if (!RocketMqAdminUtil.topicExist(this.rocketMqConfig,
this.topicName)) {
// read queue 1, write queue 1, prem 1
- RocketMQConnectUtil.createTopic(this.rocketMqConnectConfig, new
TopicConfig(this.topicName, 1, 1, 6));
+ RocketMqAdminUtil.createTopic(this.rocketMqConfig, new
TopicConfig(this.topicName, 1, 1, 6));
LOGGER.info("Create rocketmq signal topic {}", this.topicName);
}
String groupName = connectorName.concat("-signal-group");
- Set<String> groupSet =
RocketMQConnectUtil.fetchAllConsumerGroup(this.rocketMqConnectConfig);
+ Set<String> groupSet =
RocketMqAdminUtil.fetchAllConsumerGroup(this.rocketMqConfig);
if (!groupSet.contains(groupName)) {
// create consumer group
- RocketMQConnectUtil.createSubGroup(this.rocketMqConnectConfig,
rocketMqConnectConfig.getRmqConsumerGroup());
+ RocketMqAdminUtil.createSubGroup(this.rocketMqConfig,
rocketMqConfig.getGroupId());
}
- return
RocketMQConnectUtil.initDefaultLitePullConsumer(rocketMqConnectConfig, false);
+ return RocketMqAdminUtil.initDefaultLitePullConsumer(rocketMqConfig,
false);
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index 454f354c..a8f11da4 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -119,7 +119,7 @@ public class PluginUtils {
+ "|org\\.xml\\.sax"
+ "|io\\.openmessaging\\.connector\\.api"
+ "|org\\.slf4j"
- + "|org\\.apache\\.rocketmq\\.client"
+ + "|org\\.apache\\.rocketmq"
+ ")\\..*$"
+ "|io\\.openmessaging\\.KeyValue");
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index c22e155f..39050d4d 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -313,4 +313,4 @@ public class RestHandlerTest {
}
-}
\ No newline at end of file
+}