This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 61fcf438b12699bfb2dbb8e77fba293ff63b075c Author: dongeforever <[email protected]> AuthorDate: Thu Dec 23 16:48:56 2021 +0800 Finish the topic stats and consume stats in admin client --- .../broker/processor/AdminBrokerProcessor.java | 73 +------------- .../common/statictopic/TopicQueueMappingOne.java | 46 +++++---- .../common/statictopic/TopicQueueMappingUtils.java | 4 +- .../tools/admin/DefaultMQAdminExtImpl.java | 92 ++++++++++------- .../apache/rocketmq/tools/admin/MQAdminUtils.java | 110 +++++++++++++++++++++ 5 files changed, 199 insertions(+), 126 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0d2a59b..61a8898 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1019,69 +1019,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } - private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) { - try { - assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO; - if (mappingContext.getMappingDetail() == null) { - return null; - } - final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader(); - String topic = requestHeader.getTopic(); - TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); - Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>(); - Set<String> brokers = new HashSet<>(); - mappingDetail.getHostedQueues().forEach((qid, items) -> { - if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) { - LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2]; - itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true); - itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true); - assert itemPair[0] != null && itemPair[1] != null; - qidItemMap.put(qid, itemPair); - brokers.add(itemPair[0].getBname()); - brokers.add(itemPair[1].getBname()); - } - }); - Map<String, TopicStatsTable> statsTable = new HashMap<>(); - for (String broker: brokers) { - GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader(); - header.setTopic(topic); - header.setBname(broker); - header.setLo(false); - RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null); - RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get(); - if (rpcResponse.getException() != null) { - throw rpcResponse.getException(); - } - statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody()); - } - TopicStatsTable topicStatsTable = new TopicStatsTable(); - qidItemMap.forEach((qid, itemPair) -> { - LogicQueueMappingItem minItem = itemPair[0]; - LogicQueueMappingItem maxItem = itemPair[1]; - TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId())); - TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId())); - - assert minTopicOffset != null && maxTopicOffset != null; - - long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset()); - if (min < 0) - min = 0; - long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset()); - if (max < 0) - max = 0; - long timestamp = maxTopicOffset.getLastUpdateTimestamp(); - - TopicOffset topicOffset = new TopicOffset(); - topicOffset.setMinOffset(min); - topicOffset.setMaxOffset(max); - topicOffset.setLastUpdateTimestamp(timestamp); - topicStatsTable.getOffsetTable().put(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), qid), topicOffset); - }); - return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable); - } catch (Throwable t) { - return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t)); - } - } + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { @@ -1097,15 +1035,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements return response; } TopicStatsTable topicStatsTable = new TopicStatsTable(); - TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false); - RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext); - if (rpcResponse != null) { - if (rpcResponse.getException() != null) { - return RpcClientUtils.createCommandForRpcResponse(rpcResponse); - } else { - topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable()); - } - } for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java index 319e113..636f1d5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java @@ -28,8 +28,10 @@ public class TopicQueueMappingOne extends RemotingSerializable { String bname; //identify the hosted broker name Integer globalId; List<LogicQueueMappingItem> items; + TopicQueueMappingDetail mappingDetail; - public TopicQueueMappingOne(String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) { + public TopicQueueMappingOne(TopicQueueMappingDetail mappingDetail, String topic, String bname, Integer globalId, List<LogicQueueMappingItem> items) { + this.mappingDetail = mappingDetail; this.topic = topic; this.bname = bname; this.globalId = globalId; @@ -52,29 +54,35 @@ public class TopicQueueMappingOne extends RemotingSerializable { return items; } - @Override - public boolean equals(Object o) { - if (this == o) return true; + public TopicQueueMappingDetail getMappingDetail() { + return mappingDetail; + } - if (!(o instanceof TopicQueueMappingOne)) return false; + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof TopicQueueMappingOne)) + return false; TopicQueueMappingOne that = (TopicQueueMappingOne) o; - return new EqualsBuilder() - .append(topic, that.topic) - .append(bname, that.bname) - .append(globalId, that.globalId) - .append(items, that.items) - .isEquals(); + if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + if (bname != null ? !bname.equals(that.bname) : that.bname != null) + return false; + if (globalId != null ? !globalId.equals(that.globalId) : that.globalId != null) + return false; + if (items != null ? !items.equals(that.items) : that.items != null) + return false; + return mappingDetail != null ? mappingDetail.equals(that.mappingDetail) : that.mappingDetail == null; } - @Override - public int hashCode() { - return new HashCodeBuilder(17, 37) - .append(topic) - .append(bname) - .append(globalId) - .append(items) - .toHashCode(); + @Override public int hashCode() { + int result = topic != null ? topic.hashCode() : 0; + result = 31 * result + (bname != null ? bname.hashCode() : 0); + result = 31 * result + (globalId != null ? globalId.hashCode() : 0); + result = 31 * result + (items != null ? items.hashCode() : 0); + result = 31 * result + (mappingDetail != null ? mappingDetail.hashCode() : 0); + return result; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index 6a1c39c..bf02ccd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -371,7 +371,7 @@ public class TopicQueueMappingUtils { throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname())); } } else { - globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue())); + globalIdMap.put(globalid, new TopicQueueMappingOne(mappingDetail, mappingDetail.topic, mappingDetail.bname, globalid, entry.getValue())); } } } @@ -384,8 +384,8 @@ public class TopicQueueMappingUtils { throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i)); } } - checkIfReusePhysicalQueue(globalIdMap.values()); } + checkIfReusePhysicalQueue(globalIdMap.values()); return globalIdMap; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index fce4318..b7e4816 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -16,6 +16,19 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.admin.MQAdminExtInner; @@ -64,7 +77,11 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; +import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; @@ -78,30 +95,11 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.MessageTrack; import org.apache.rocketmq.tools.admin.api.TrackType; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Random; -import java.util.Set; +import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems; +import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig; public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { - private final InternalLogger log = ClientLogger.getLog(); - private final DefaultMQAdminExt defaultMQAdminExt; - private ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mqClientInstance; - private RPCHook rpcHook; - private long timeoutMillis = 20000; - private Random random = new Random(); - private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>(); static { @@ -120,6 +118,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS); } + private final InternalLogger log = ClientLogger.getLog(); + private final DefaultMQAdminExt defaultMQAdminExt; + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mqClientInstance; + private RPCHook rpcHook; + private long timeoutMillis = 20000; + private Random random = new Random(); + public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) { this(defaultMQAdminExt, null, timeoutMillis); } @@ -245,7 +251,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + public TopicConfig examineTopicConfig(String addr, + String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis); } @@ -264,6 +271,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } } + //Get the static stats + Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(topic, topicRouteData, defaultMQAdminExt); + MQAdminUtils.convertPhysicalTopicStats(topic, brokerConfigMap, topicStatsTable); if (topicStatsTable.getOffsetTable().isEmpty()) { throw new MQClientException("Not found the topic stats info", null); @@ -272,10 +282,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return topicStatsTable; } - @Override - public TopicStatsTable examineTopicStats(String brokerAddr, String topic) throws RemotingException, MQClientException, InterruptedException, - MQBrokerException { + public TopicStatsTable examineTopicStats(String brokerAddr, + String topic) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(brokerAddr, topic, timeoutMillis); } @@ -323,12 +333,29 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } } - if (result.getOffsetTable().isEmpty()) { + Set<String> topics = new HashSet<>(); + for (MessageQueue messageQueue: result.getOffsetTable().keySet()) { + topics.add(messageQueue.getTopic()); + } + + ConsumeStats staticResult = new ConsumeStats(); + staticResult.setConsumeTps(result.getConsumeTps()); + // for topic, we put the physical stats, how about group? + // staticResult.getOffsetTable().putAll(result.getOffsetTable()); + + for (String currentTopic: topics) { + TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic); + Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt); + ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result); + staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable()); + } + + if (staticResult.getOffsetTable().isEmpty()) { throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message"); } - return result; + return staticResult; } @Override @@ -355,7 +382,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId); } - @Override public ConsumerConnection examineConsumerConnectionInfo( String consumerGroup) throws InterruptedException, MQBrokerException, @@ -416,7 +442,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { @Override public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis); } @@ -1047,11 +1073,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException { + public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, + final TopicQueueMappingDetail mappingDetail, + final boolean force) throws RemotingException, InterruptedException, MQBrokerException { this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis); } - @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); @@ -1062,7 +1089,6 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); } - @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().minOffset(mq); @@ -1150,7 +1176,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { @Override public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup, final - MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis) + MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java index a5aab4d..cd2c4ac 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java @@ -19,16 +19,20 @@ package org.apache.rocketmq.tools.admin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.admin.ConsumeStats; +import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -42,6 +46,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems; +import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig; + public class MQAdminUtils { @@ -229,4 +236,107 @@ public class MQAdminUtils { } return brokerConfigMap; } + + + public static Map<String, TopicConfigAndQueueMapping> examineTopicConfigFromRoute(String topic, TopicRouteData topicRouteData, DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, InterruptedException, MQBrokerException { + Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>(); + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String broker = bd.getBrokerName(); + String addr = bd.selectBrokerAddr(); + if (addr == null) { + continue; + } + try { + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the config is null + if (mapping != null) { + if (mapping.getMappingDetail() != null) { + assert mapping.getMappingDetail().getBname().equals(broker); + } + brokerConfigMap.put(broker, mapping); + } + } catch (MQBrokerException exception) { + if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) { + throw exception; + } + } + } + return brokerConfigMap; + } + + public static void convertPhysicalTopicStats(String topic, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, TopicStatsTable topicStatsTable) { + Map<Integer, TopicQueueMappingOne> globalIdMap = checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), true, false); + for (Map.Entry<Integer, TopicQueueMappingOne> entry: globalIdMap.entrySet()) { + Integer qid = entry.getKey(); + TopicQueueMappingOne mappingOne = entry.getValue(); + LogicQueueMappingItem minItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), 0, true); + LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingOne.getItems(), Long.MAX_VALUE, true); + assert minItem != null && maxItem != null; + TopicOffset minTopicOffset = topicStatsTable.getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId())); + TopicOffset maxTopicOffset = topicStatsTable.getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId())); + + if (minTopicOffset == null + || maxTopicOffset == null) { + continue; + } + long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset()); + if (min < 0) + min = 0; + long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset()); + if (max < 0) + max = 0; + long timestamp = maxTopicOffset.getLastUpdateTimestamp(); + + TopicOffset topicOffset = new TopicOffset(); + topicOffset.setMinOffset(min); + topicOffset.setMaxOffset(max); + topicOffset.setLastUpdateTimestamp(timestamp); + topicStatsTable.getOffsetTable().put(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()), qid), topicOffset); + } + } + + + public static ConsumeStats convertPhysicalConsumeStats(Map<String, TopicConfigAndQueueMapping> brokerConfigMap, ConsumeStats physicalResult) { + Map<Integer, TopicQueueMappingOne> globalIdMap = checkAndBuildMappingItems(getMappingDetailFromConfig(brokerConfigMap.values()), true, false); + ConsumeStats result = new ConsumeStats(); + result.setConsumeTps(physicalResult.getConsumeTps()); + for (Map.Entry<Integer, TopicQueueMappingOne> entry : globalIdMap.entrySet()) { + Integer qid = entry.getKey(); + TopicQueueMappingOne mappingOne = entry.getValue(); + MessageQueue messageQueue = new MessageQueue(mappingOne.getTopic(), TopicQueueMappingUtils.getMockBrokerName(mappingOne.getMappingDetail().getScope()), qid); + OffsetWrapper offsetWrapper = new OffsetWrapper(); + long brokerOffset = -1; + long consumerOffset = -1; + long lastTimestamp = -1; //maybe need to be polished + for (int i = mappingOne.getItems().size() - 1; i >= 0; i--) { + LogicQueueMappingItem item = mappingOne.getItems().get(i); + MessageQueue phyQueue = new MessageQueue(mappingOne.getTopic(), item.getBname(), item.getQueueId()); + OffsetWrapper phyOffsetWrapper = physicalResult.getOffsetTable().get(phyQueue); + if (phyOffsetWrapper == null) { + continue; + } + if (consumerOffset == -1 + && phyOffsetWrapper.getConsumerOffset() >= 0) { + consumerOffset = phyOffsetWrapper.getConsumerOffset(); + lastTimestamp = phyOffsetWrapper.getLastTimestamp(); + } + if (brokerOffset == -1 + && item.getLogicOffset() >= 0) { + brokerOffset = item.computeStaticQueueOffsetStrictly(offsetWrapper.getBrokerOffset()); + } + if (consumerOffset >= 0 + && brokerOffset >= 0) { + break; + } + } + if (brokerOffset >= 0 + && consumerOffset >= 0) { + offsetWrapper.setBrokerOffset(brokerOffset); + offsetWrapper.setConsumerOffset(consumerOffset); + offsetWrapper.setLastTimestamp(lastTimestamp); + result.getOffsetTable().put(messageQueue, offsetWrapper); + } + } + return result; + } }
