This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ad72dbee5 [ISSUE #5022] Optimize DefaultMQAdminExtImpl (#5023)
ad72dbee5 is described below

commit ad72dbee54919fc1cb28e98f4b3d284ced0a6b64
Author: zhangjidi2016 <[email protected]>
AuthorDate: Thu Sep 8 11:00:26 2022 +0800

    [ISSUE #5022] Optimize DefaultMQAdminExtImpl (#5023)
    
    Co-authored-by: zhangjidi <[email protected]>
---
 .../tools/admin/DefaultMQAdminExtImpl.java         | 33 +++++++++++-----------
 1 file changed, 16 insertions(+), 17 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 465555ba2..4b14c8f3c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -123,6 +124,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
+        SYSTEM_GROUP_SET.add(MixAll.SCHEDULE_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
         SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
@@ -349,7 +351,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                 final TopicStatsTable topicStatsTable = new TopicStatsTable();
                 TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
 
-                if (topicRouteData == null || topicRouteData.getBrokerDatas() 
== null || topicRouteData.getBrokerDatas().size() == 0) {
+                if (topicRouteData == null || 
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
                     return AdminToolResult.success(topicStatsTable);
                 }
                 final CountDownLatch latch = new 
CountDownLatch(topicRouteData.getBrokerDatas().size());
@@ -364,7 +366,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                                     
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
                                 }
                             } catch (Exception e) {
-                                log.error("getTopicStatsInfo error. topic=" + 
topic, e);
+                                log.error("getTopicStatsInfo error. topic={}", 
topic, e);
                             } finally {
                                 latch.countDown();
                             }
@@ -508,7 +510,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                         continue;
                     }
                 }
-                if (topicRouteData == null || topicRouteData.getBrokerDatas() 
== null || topicRouteData.getBrokerDatas().size() == 0) {
+                if (topicRouteData == null || 
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
                     return 
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, 
"topic router info not found");
                 }
 
@@ -527,7 +529,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                                     consumerTpsMap.put(addr, 
consumeStats.getConsumeTps());
                                 }
                             } catch (Exception e) {
-                                log.error("getTopicStatsInfo error. topic=" + 
topic, e);
+                                log.error("getConsumeStats error. topic={}, 
consumerGroup={}", topic, consumerGroup, e);
                             } finally {
                                 latch.countDown();
                             }
@@ -719,7 +721,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                         
mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, 
timeoutMillis);
                         successList.add(addr);
                     } catch (Exception e) {
-                        log.error("deleteTopicInBrokerConcurrent error. 
topic=" + topic + ", host=" + addr, e);
+                        log.error("deleteTopicInBroker error. topic={}, 
broker={}", topic, addr, e);
                         failureList.add(addr);
                     } finally {
                         latch.countDown();
@@ -851,7 +853,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
             @Override
             public AdminToolResult doExecute() throws Exception {
                 TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
-                if (topicRouteData == null || topicRouteData.getBrokerDatas() 
== null || topicRouteData.getBrokerDatas().size() == 0) {
+                if (topicRouteData == null || 
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
                     return 
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, 
"topic router info not found");
                 }
                 final Map<String, QueueData> topicRouteMap = new 
HashMap<String, QueueData>();
@@ -1037,10 +1039,7 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
             if (addr != null) {
                 return 
this.mqClientInstance.getMQClientAPIImpl().querySubscriptionByConsumer(addr, 
group, topic, timeoutMillis);
             }
-
-            break;
         }
-
         return null;
     }
 
@@ -1071,7 +1070,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                 String retryTopic = MixAll.getRetryTopic(group);
                 TopicRouteData topicRouteData = 
examineTopicRouteInfo(retryTopic);
 
-                if (topicRouteData == null || topicRouteData.getBrokerDatas() 
== null || topicRouteData.getBrokerDatas().size() == 0) {
+                if (topicRouteData == null || 
CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
                     return 
AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, 
"router info not found.");
                 }
                 final TopicList result = new TopicList();
@@ -1087,7 +1086,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                                     
result.getTopicList().addAll(topicList.getTopicList());
                                 }
                             } catch (Exception e) {
-                                log.error("getTopicStatsInfo error. groupId=" 
+ group, e);
+                                log.error("queryTopicsByConsumer error. 
group={}", group, e);
                             } finally {
                                 latch.countDown();
                             }
@@ -1137,7 +1136,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
                                     
spanSet.addAll(mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, 
topic, group, timeoutMillis));
                                 }
                             } catch (Exception e) {
-                                log.error("queryConsumeTimeSpan error. topic=" 
+ topic, e);
+                                log.error("queryConsumeTimeSpan error. 
topic={}, group={}", topic, group, e);
                             } finally {
                                 latch.countDown();
                             }
@@ -1185,7 +1184,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     public boolean cleanExpiredConsumerQueueByAddr(
         String addr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
         boolean result = 
mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, 
timeoutMillis);
-        log.warn("clean expired ConsumeQueue on target " + addr + " broker " + 
result);
+        log.warn("clean expired ConsumeQueue on target broker={}, execute 
result={}", addr, result);
         return result;
     }
 
@@ -1195,7 +1194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         boolean result = false;
         try {
             ClusterInfo clusterInfo = examineBrokerClusterInfo();
-            if (null == cluster || "".equals(cluster)) {
+            if (StringUtils.isEmpty(cluster)) {
                 for (String targetCluster : 
clusterInfo.retrieveAllClusterNames()) {
                     result = deleteExpiredCommitLogByCluster(clusterInfo, 
targetCluster);
                 }
@@ -1224,7 +1223,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     public boolean deleteExpiredCommitLogByAddr(
         String addr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
         boolean result = 
mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr, 
timeoutMillis);
-        log.warn("Delete expired CommitLog on target " + addr + " broker " + 
result);
+        log.warn("Delete expired CommitLog on target broker={}, execute 
result={}", addr, result);
         return result;
     }
 
@@ -1234,7 +1233,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         boolean result = false;
         try {
             ClusterInfo clusterInfo = examineBrokerClusterInfo();
-            if (null == cluster || "".equals(cluster)) {
+            if (StringUtils.isEmpty(cluster)) {
                 for (String targetCluster : 
clusterInfo.retrieveAllClusterNames()) {
                     result = cleanUnusedTopicByCluster(clusterInfo, 
targetCluster);
                 }
@@ -1262,7 +1261,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     public boolean cleanUnusedTopicByAddr(
         String addr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQClientException, 
InterruptedException {
         boolean result = 
mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, 
timeoutMillis);
-        log.warn("clean expired ConsumeQueue on target " + addr + " broker " + 
result);
+        log.warn("clean unused topic on target broker={}, execute result={}", 
addr, result);
         return result;
     }
 

Reply via email to