This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ad72dbee5 [ISSUE #5022] Optimize DefaultMQAdminExtImpl (#5023)
ad72dbee5 is described below
commit ad72dbee54919fc1cb28e98f4b3d284ced0a6b64
Author: zhangjidi2016 <[email protected]>
AuthorDate: Thu Sep 8 11:00:26 2022 +0800
[ISSUE #5022] Optimize DefaultMQAdminExtImpl (#5023)
Co-authored-by: zhangjidi <[email protected]>
---
.../tools/admin/DefaultMQAdminExtImpl.java | 33 +++++++++++-----------
1 file changed, 16 insertions(+), 17 deletions(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 465555ba2..4b14c8f3c 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -123,6 +124,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
+ SYSTEM_GROUP_SET.add(MixAll.SCHEDULE_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
@@ -349,7 +351,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
final TopicStatsTable topicStatsTable = new TopicStatsTable();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
- if (topicRouteData == null || topicRouteData.getBrokerDatas()
== null || topicRouteData.getBrokerDatas().size() == 0) {
+ if (topicRouteData == null ||
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return AdminToolResult.success(topicStatsTable);
}
final CountDownLatch latch = new
CountDownLatch(topicRouteData.getBrokerDatas().size());
@@ -364,7 +366,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
} catch (Exception e) {
- log.error("getTopicStatsInfo error. topic=" +
topic, e);
+ log.error("getTopicStatsInfo error. topic={}",
topic, e);
} finally {
latch.countDown();
}
@@ -508,7 +510,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
continue;
}
}
- if (topicRouteData == null || topicRouteData.getBrokerDatas()
== null || topicRouteData.getBrokerDatas().size() == 0) {
+ if (topicRouteData == null ||
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST,
"topic router info not found");
}
@@ -527,7 +529,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
consumerTpsMap.put(addr,
consumeStats.getConsumeTps());
}
} catch (Exception e) {
- log.error("getTopicStatsInfo error. topic=" +
topic, e);
+ log.error("getConsumeStats error. topic={},
consumerGroup={}", topic, consumerGroup, e);
} finally {
latch.countDown();
}
@@ -719,7 +721,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic,
timeoutMillis);
successList.add(addr);
} catch (Exception e) {
- log.error("deleteTopicInBrokerConcurrent error.
topic=" + topic + ", host=" + addr, e);
+ log.error("deleteTopicInBroker error. topic={},
broker={}", topic, addr, e);
failureList.add(addr);
} finally {
latch.countDown();
@@ -851,7 +853,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
@Override
public AdminToolResult doExecute() throws Exception {
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
- if (topicRouteData == null || topicRouteData.getBrokerDatas()
== null || topicRouteData.getBrokerDatas().size() == 0) {
+ if (topicRouteData == null ||
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST,
"topic router info not found");
}
final Map<String, QueueData> topicRouteMap = new
HashMap<String, QueueData>();
@@ -1037,10 +1039,7 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
if (addr != null) {
return
this.mqClientInstance.getMQClientAPIImpl().querySubscriptionByConsumer(addr,
group, topic, timeoutMillis);
}
-
- break;
}
-
return null;
}
@@ -1071,7 +1070,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
String retryTopic = MixAll.getRetryTopic(group);
TopicRouteData topicRouteData =
examineTopicRouteInfo(retryTopic);
- if (topicRouteData == null || topicRouteData.getBrokerDatas()
== null || topicRouteData.getBrokerDatas().size() == 0) {
+ if (topicRouteData == null ||
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST,
"router info not found.");
}
final TopicList result = new TopicList();
@@ -1087,7 +1086,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
result.getTopicList().addAll(topicList.getTopicList());
}
} catch (Exception e) {
- log.error("getTopicStatsInfo error. groupId="
+ group, e);
+ log.error("queryTopicsByConsumer error.
group={}", group, e);
} finally {
latch.countDown();
}
@@ -1137,7 +1136,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
spanSet.addAll(mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr,
topic, group, timeoutMillis));
}
} catch (Exception e) {
- log.error("queryConsumeTimeSpan error. topic="
+ topic, e);
+ log.error("queryConsumeTimeSpan error.
topic={}, group={}", topic, group, e);
} finally {
latch.countDown();
}
@@ -1185,7 +1184,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
public boolean cleanExpiredConsumerQueueByAddr(
String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
boolean result =
mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr,
timeoutMillis);
- log.warn("clean expired ConsumeQueue on target " + addr + " broker " +
result);
+ log.warn("clean expired ConsumeQueue on target broker={}, execute
result={}", addr, result);
return result;
}
@@ -1195,7 +1194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
- if (null == cluster || "".equals(cluster)) {
+ if (StringUtils.isEmpty(cluster)) {
for (String targetCluster :
clusterInfo.retrieveAllClusterNames()) {
result = deleteExpiredCommitLogByCluster(clusterInfo,
targetCluster);
}
@@ -1224,7 +1223,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
public boolean deleteExpiredCommitLogByAddr(
String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
boolean result =
mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr,
timeoutMillis);
- log.warn("Delete expired CommitLog on target " + addr + " broker " +
result);
+ log.warn("Delete expired CommitLog on target broker={}, execute
result={}", addr, result);
return result;
}
@@ -1234,7 +1233,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
- if (null == cluster || "".equals(cluster)) {
+ if (StringUtils.isEmpty(cluster)) {
for (String targetCluster :
clusterInfo.retrieveAllClusterNames()) {
result = cleanUnusedTopicByCluster(clusterInfo,
targetCluster);
}
@@ -1262,7 +1261,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
public boolean cleanUnusedTopicByAddr(
String addr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException,
InterruptedException {
boolean result =
mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr,
timeoutMillis);
- log.warn("clean expired ConsumeQueue on target " + addr + " broker " +
result);
+ log.warn("clean unused topic on target broker={}, execute result={}",
addr, result);
return result;
}