This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 61fcf438b12699bfb2dbb8e77fba293ff63b075c
Author: dongeforever <[email protected]>
AuthorDate: Thu Dec 23 16:48:56 2021 +0800

    Finish the topic stats and consume stats in admin client
---
 .../broker/processor/AdminBrokerProcessor.java     |  73 +-------------
 .../common/statictopic/TopicQueueMappingOne.java   |  46 +++++----
 .../common/statictopic/TopicQueueMappingUtils.java |   4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  92 ++++++++++-------
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  | 110 +++++++++++++++++++++
 5 files changed, 199 insertions(+), 126 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0d2a59b..61a8898 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1019,69 +1019,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
-    private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest 
request, TopicQueueMappingContext mappingContext) {
-        try {
-            assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
-            if (mappingContext.getMappingDetail() == null) {
-                return null;
-            }
-            final GetTopicStatsInfoRequestHeader requestHeader = 
(GetTopicStatsInfoRequestHeader) request.getHeader();
-            String topic = requestHeader.getTopic();
-            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-            Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
-            Set<String> brokers = new HashSet<>();
-            mappingDetail.getHostedQueues().forEach((qid, items) -> {
-                if (TopicQueueMappingUtils.checkIfLeader(items, 
mappingDetail)) {
-                    LogicQueueMappingItem[] itemPair = new 
LogicQueueMappingItem[2];
-                    itemPair[0] = 
TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
-                    itemPair[1] = 
TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
-                    assert itemPair[0] != null && itemPair[1] != null;
-                    qidItemMap.put(qid, itemPair);
-                    brokers.add(itemPair[0].getBname());
-                    brokers.add(itemPair[1].getBname());
-                }
-            });
-            Map<String, TopicStatsTable> statsTable = new HashMap<>();
-            for (String broker: brokers) {
-                GetTopicStatsInfoRequestHeader header = new 
GetTopicStatsInfoRequestHeader();
-                header.setTopic(topic);
-                header.setBname(broker);
-                header.setLo(false);
-                RpcRequest rpcRequest = new 
RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
-                RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, 
brokerConfig.getForwardTimeout()).get();
-                if (rpcResponse.getException() != null) {
-                    throw rpcResponse.getException();
-                }
-                statsTable.put(broker, (TopicStatsTable) 
rpcResponse.getBody());
-            }
-            TopicStatsTable topicStatsTable = new TopicStatsTable();
-            qidItemMap.forEach((qid, itemPair) -> {
-                LogicQueueMappingItem minItem = itemPair[0];
-                LogicQueueMappingItem maxItem = itemPair[1];
-                TopicOffset minTopicOffset = 
statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, 
minItem.getBname(), minItem.getQueueId()));
-                TopicOffset maxTopicOffset = 
statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, 
maxItem.getBname(), maxItem.getQueueId()));
-
-                assert  minTopicOffset != null && maxTopicOffset != null;
-
-                long min = 
minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
-                if (min < 0)
-                    min = 0;
-                long max = 
maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
-                if (max < 0)
-                    max = 0;
-                long timestamp = maxTopicOffset.getLastUpdateTimestamp();
-
-                TopicOffset topicOffset = new TopicOffset();
-                topicOffset.setMinOffset(min);
-                topicOffset.setMaxOffset(max);
-                topicOffset.setLastUpdateTimestamp(timestamp);
-                topicStatsTable.getOffsetTable().put(new MessageQueue(topic, 
TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), qid), 
topicOffset);
-            });
-            return new RpcResponse(ResponseCode.SUCCESS, null, 
topicStatsTable);
-        } catch (Throwable t) {
-            return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, 
t.getMessage(), t));
-        }
-    }
+
 
     private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
@@ -1097,15 +1035,6 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             return response;
         }
         TopicStatsTable topicStatsTable = new TopicStatsTable();
-        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false);
-        RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new 
RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), 
mappingContext);
-        if (rpcResponse != null) {
-            if (rpcResponse.getException() != null) {
-                return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
-            } else {
-                
topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
-            }
-        }
 
         for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
             MessageQueue mq = new MessageQueue();
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
index 319e113..636f1d5 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
@@ -28,8 +28,10 @@ public class TopicQueueMappingOne extends 
RemotingSerializable {
     String bname;  //identify the hosted broker name
     Integer globalId;
     List<LogicQueueMappingItem> items;
+    TopicQueueMappingDetail mappingDetail;
 
-    public TopicQueueMappingOne(String topic, String bname, Integer globalId, 
List<LogicQueueMappingItem> items) {
+    public TopicQueueMappingOne(TopicQueueMappingDetail mappingDetail, String 
topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) {
+        this.mappingDetail =  mappingDetail;
         this.topic = topic;
         this.bname = bname;
         this.globalId = globalId;
@@ -52,29 +54,35 @@ public class TopicQueueMappingOne extends 
RemotingSerializable {
         return items;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
+    public TopicQueueMappingDetail getMappingDetail() {
+        return mappingDetail;
+    }
 
-        if (!(o instanceof TopicQueueMappingOne)) return false;
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof TopicQueueMappingOne))
+            return false;
 
         TopicQueueMappingOne that = (TopicQueueMappingOne) o;
 
-        return new EqualsBuilder()
-                .append(topic, that.topic)
-                .append(bname, that.bname)
-                .append(globalId, that.globalId)
-                .append(items, that.items)
-                .isEquals();
+        if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+            return false;
+        if (bname != null ? !bname.equals(that.bname) : that.bname != null)
+            return false;
+        if (globalId != null ? !globalId.equals(that.globalId) : that.globalId 
!= null)
+            return false;
+        if (items != null ? !items.equals(that.items) : that.items != null)
+            return false;
+        return mappingDetail != null ? 
mappingDetail.equals(that.mappingDetail) : that.mappingDetail == null;
     }
 
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder(17, 37)
-                .append(topic)
-                .append(bname)
-                .append(globalId)
-                .append(items)
-                .toHashCode();
+    @Override public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (bname != null ? bname.hashCode() : 0);
+        result = 31 * result + (globalId != null ? globalId.hashCode() : 0);
+        result = 31 * result + (items != null ? items.hashCode() : 0);
+        result = 31 * result + (mappingDetail != null ? 
mappingDetail.hashCode() : 0);
+        return result;
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 6a1c39c..bf02ccd 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -371,7 +371,7 @@ public class TopicQueueMappingUtils {
                         throw new RuntimeException(String.format("The queue id 
is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
                     }
                 } else {
-                    globalIdMap.put(globalid, new 
TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, 
entry.getValue()));
+                    globalIdMap.put(globalid, new 
TopicQueueMappingOne(mappingDetail, mappingDetail.topic, mappingDetail.bname, 
globalid, entry.getValue()));
                 }
             }
         }
@@ -384,8 +384,8 @@ public class TopicQueueMappingUtils {
                     throw new RuntimeException(String.format("The queue number 
%s is not in globalIdMap", i));
                 }
             }
-            checkIfReusePhysicalQueue(globalIdMap.values());
         }
+        checkIfReusePhysicalQueue(globalIdMap.values());
         return globalIdMap;
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index fce4318..b7e4816 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -16,6 +16,19 @@
  */
 package org.apache.rocketmq.tools.admin;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -64,7 +77,11 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -78,30 +95,11 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
 
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
+import static 
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
+import static 
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
 
 public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
-    private final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQAdminExt defaultMQAdminExt;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mqClientInstance;
-    private RPCHook rpcHook;
-    private long timeoutMillis = 20000;
-    private Random random = new Random();
-
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
 
     static {
@@ -120,6 +118,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
     }
 
+    private final InternalLogger log = ClientLogger.getLog();
+    private final DefaultMQAdminExt defaultMQAdminExt;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private MQClientInstance mqClientInstance;
+    private RPCHook rpcHook;
+    private long timeoutMillis = 20000;
+    private Random random = new Random();
+
     public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long 
timeoutMillis) {
         this(defaultMQAdminExt, null, timeoutMillis);
     }
@@ -245,7 +251,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     }
 
     @Override
-    public TopicConfig examineTopicConfig(String addr, String topic) throws 
InterruptedException, MQBrokerException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException {
+    public TopicConfig examineTopicConfig(String addr,
+        String topic) throws InterruptedException, MQBrokerException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
         return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, 
topic, timeoutMillis);
     }
 
@@ -264,6 +271,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
             }
         }
 
+        //Get the static stats
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = 
MQAdminUtils.examineTopicConfigFromRoute(topic, topicRouteData, 
defaultMQAdminExt);
+        MQAdminUtils.convertPhysicalTopicStats(topic, brokerConfigMap, 
topicStatsTable);
 
         if (topicStatsTable.getOffsetTable().isEmpty()) {
             throw new MQClientException("Not found the topic stats info", 
null);
@@ -272,10 +282,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return topicStatsTable;
     }
 
-
     @Override
-    public TopicStatsTable examineTopicStats(String brokerAddr, String topic) 
throws RemotingException, MQClientException, InterruptedException,
-            MQBrokerException {
+    public TopicStatsTable examineTopicStats(String brokerAddr,
+        String topic) throws RemotingException, MQClientException, 
InterruptedException,
+        MQBrokerException {
         return 
this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, 
timeoutMillis);
     }
 
@@ -323,12 +333,29 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
             }
         }
 
-        if (result.getOffsetTable().isEmpty()) {
+        Set<String> topics = new HashSet<>();
+        for (MessageQueue messageQueue: result.getOffsetTable().keySet()) {
+            topics.add(messageQueue.getTopic());
+        }
+
+        ConsumeStats staticResult = new ConsumeStats();
+        staticResult.setConsumeTps(result.getConsumeTps());
+        // for topic, we put the physical stats, how about group?
+        // staticResult.getOffsetTable().putAll(result.getOffsetTable());
+
+        for (String currentTopic: topics) {
+            TopicRouteData currentRoute = 
this.examineTopicRouteInfo(currentTopic);
+            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = 
MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, 
defaultMQAdminExt);
+            ConsumeStats consumeStats = 
MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
+            
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
+        }
+
+        if (staticResult.getOffsetTable().isEmpty()) {
             throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
                 "Not found the consumer group consume stats, because return 
offset table is empty, maybe the consumer not consume any message");
         }
 
-        return result;
+        return staticResult;
     }
 
     @Override
@@ -355,7 +382,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return 
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
     }
 
-
     @Override
     public ConsumerConnection examineConsumerConnectionInfo(
         String consumerGroup) throws InterruptedException, MQBrokerException,
@@ -416,7 +442,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
 
     @Override
     public int addWritePermOfBroker(String namesrvAddr, String brokerName) 
throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
+        RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
         return 
this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, 
brokerName, timeoutMillis);
     }
 
@@ -1047,11 +1073,12 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
mappingDetail, final boolean force) throws RemotingException,  
InterruptedException, MQBrokerException {
+    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig,
+        final TopicQueueMappingDetail mappingDetail,
+        final boolean force) throws RemotingException, InterruptedException, 
MQBrokerException {
         this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, 
defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
     }
 
-
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, 
timestamp);
@@ -1062,7 +1089,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
     }
 
-
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
@@ -1150,7 +1176,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
 
     @Override
     public void setMessageRequestMode(final String brokerAddr, final String 
topic, final String consumerGroup, final
-        MessageRequestMode mode, final int popShareQueueNum, final long 
timeoutMillis)
+    MessageRequestMode mode, final int popShareQueueNum, final long 
timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
         RemotingConnectException, MQClientException {
         
this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, 
topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index a5aab4d..cd2c4ac 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -19,16 +19,20 @@ package org.apache.rocketmq.tools.admin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -42,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
+import static 
org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
+
 public class MQAdminUtils {
 
 
@@ -229,4 +236,107 @@ public class MQAdminUtils {
         }
         return brokerConfigMap;
     }
+
+
+    public static Map<String, TopicConfigAndQueueMapping> 
examineTopicConfigFromRoute(String topic, TopicRouteData topicRouteData, 
DefaultMQAdminExt defaultMQAdminExt) throws RemotingException,  
InterruptedException, MQBrokerException {
+        Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new 
HashMap<>();
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            String broker = bd.getBrokerName();
+            String addr = bd.selectBrokerAddr();
+            if (addr == null) {
+                continue;
+            }
+            try {
+                TopicConfigAndQueueMapping mapping = 
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+                //allow the config is null
+                if (mapping != null) {
+                    if (mapping.getMappingDetail() != null) {
+                        assert 
mapping.getMappingDetail().getBname().equals(broker);
+                    }
+                    brokerConfigMap.put(broker, mapping);
+                }
+            } catch (MQBrokerException exception) {
+                if (exception.getResponseCode() != 
ResponseCode.TOPIC_NOT_EXIST) {
+                    throw exception;
+                }
+            }
+        }
+        return brokerConfigMap;
+    }
+
+    public static void convertPhysicalTopicStats(String topic, Map<String, 
TopicConfigAndQueueMapping> brokerConfigMap, TopicStatsTable topicStatsTable) {
+        Map<Integer, TopicQueueMappingOne> globalIdMap = 
checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), 
true, false);
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry: 
globalIdMap.entrySet()) {
+            Integer qid = entry.getKey();
+            TopicQueueMappingOne mappingOne =  entry.getValue();
+            LogicQueueMappingItem minItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), 0, 
true);
+            LogicQueueMappingItem maxItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), 
Long.MAX_VALUE, true);
+            assert  minItem != null && maxItem != null;
+            TopicOffset minTopicOffset = 
topicStatsTable.getOffsetTable().get(new MessageQueue(topic, 
minItem.getBname(), minItem.getQueueId()));
+            TopicOffset maxTopicOffset = 
topicStatsTable.getOffsetTable().get(new MessageQueue(topic, 
maxItem.getBname(), maxItem.getQueueId()));
+
+            if (minTopicOffset == null
+                || maxTopicOffset == null) {
+                continue;
+            }
+            long min = 
minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
+            if (min < 0)
+                min = 0;
+            long max = 
maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
+            if (max < 0)
+                max = 0;
+            long timestamp = maxTopicOffset.getLastUpdateTimestamp();
+
+            TopicOffset topicOffset = new TopicOffset();
+            topicOffset.setMinOffset(min);
+            topicOffset.setMaxOffset(max);
+            topicOffset.setLastUpdateTimestamp(timestamp);
+            topicStatsTable.getOffsetTable().put(new MessageQueue(topic, 
TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()),
 qid), topicOffset);
+        }
+    }
+
+
+    public static ConsumeStats convertPhysicalConsumeStats(Map<String, 
TopicConfigAndQueueMapping> brokerConfigMap, ConsumeStats physicalResult) {
+        Map<Integer, TopicQueueMappingOne> globalIdMap = 
checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), 
true, false);
+        ConsumeStats result = new ConsumeStats();
+        result.setConsumeTps(physicalResult.getConsumeTps());
+        for (Map.Entry<Integer, TopicQueueMappingOne> entry : 
globalIdMap.entrySet()) {
+            Integer qid = entry.getKey();
+            TopicQueueMappingOne mappingOne = entry.getValue();
+            MessageQueue messageQueue = new 
MessageQueue(mappingOne.getTopic(), 
TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()),
 qid);
+            OffsetWrapper offsetWrapper = new OffsetWrapper();
+            long brokerOffset = -1;
+            long consumerOffset = -1;
+            long lastTimestamp = -1; //maybe need to be polished
+            for (int i = mappingOne.getItems().size() - 1; i >= 0; i--) {
+                LogicQueueMappingItem item = mappingOne.getItems().get(i);
+                MessageQueue phyQueue = new 
MessageQueue(mappingOne.getTopic(), item.getBname(), item.getQueueId());
+                OffsetWrapper phyOffsetWrapper = 
physicalResult.getOffsetTable().get(phyQueue);
+                if (phyOffsetWrapper == null) {
+                    continue;
+                }
+                if (consumerOffset == -1
+                    && phyOffsetWrapper.getConsumerOffset() >= 0) {
+                    consumerOffset = phyOffsetWrapper.getConsumerOffset();
+                    lastTimestamp = phyOffsetWrapper.getLastTimestamp();
+                }
+                if (brokerOffset == -1
+                    && item.getLogicOffset() >= 0) {
+                    brokerOffset = 
item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset());
+                }
+                if (consumerOffset >= 0
+                    && brokerOffset >= 0) {
+                    break;
+                }
+            }
+            if (brokerOffset >= 0
+                && consumerOffset >= 0) {
+                offsetWrapper.setBrokerOffset(brokerOffset);
+                offsetWrapper.setConsumerOffset(consumerOffset);
+                offsetWrapper.setLastTimestamp(lastTimestamp);
+                result.getOffsetTable().put(messageQueue, offsetWrapper);
+            }
+        }
+        return result;
+    }
 }

Reply via email to