This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit d8580f21b44e4866b5f63352afd7664b7e67bfe4 Author: gosonzhang <[email protected]> AuthorDate: Fri Dec 25 16:37:14 2020 +0800 [TUBEMQ-482] Add offset query api --- .../server/broker/metadata/TopicMetadata.java | 29 ++++++++ .../server/broker/msgstore/MessageStore.java | 11 ++- .../broker/msgstore/MessageStoreManager.java | 62 +++++++++++++++- .../server/broker/msgstore/StoreService.java | 6 ++ .../server/broker/offset/DefaultOffsetManager.java | 46 +++++++----- .../tubemq/server/broker/offset/OffsetService.java | 6 +- .../server/broker/utils/GroupOffsetInfo.java | 85 ++++++++++++++++++++++ .../server/broker/utils/TopicPubStoreInfo.java | 45 ++++++++++++ .../server/broker/web/BrokerAdminServlet.java | 79 +++++++++++++++----- 9 files changed, 323 insertions(+), 46 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java index c582606..800254b 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java @@ -17,7 +17,9 @@ package org.apache.tubemq.server.broker.metadata; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.tubemq.corebase.TBaseConstants; @@ -251,6 +253,33 @@ public class TopicMetadata { return partIds; } + // builder the partitionId set for each store + public Map<Integer, Set<Integer>> getStorePartIdMap() { + Map<Integer, Set<Integer>> storePartIds = new HashMap<>(); + for (int i = 0; i < numTopicStores; i++) { + Set<Integer> partIds = new HashSet<>(); + for (int j = 0; j < numPartitions; j++) { + partIds.add(i * TBaseConstants.META_STORE_INS_BASE + j); + } + storePartIds.put(i, partIds); + } + return storePartIds; + } + + public int getStoreIdByPartitionId(int partitionId) { + return partitionId % TBaseConstants.META_STORE_INS_BASE; + } + + public Set<Integer> getPartIdsByStoreId(int storeId) { + Set<Integer> partIds = new HashSet<>(); + if (storeId >= 0 && storeId < numTopicStores) { + for (int i = 0; i < numPartitions; i++) { + partIds.add(storeId * TBaseConstants.META_STORE_INS_BASE + i); + } + } + return partIds; + } + public int getStatusId() { return statusId; } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java index a827723..e610a8b 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java @@ -133,9 +133,9 @@ public class MessageStore implements Closeable { this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000 - ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt)); + ? 6000 : (Math.min(tmpIndexReadCnt, 10000))); fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000 - ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt)); + ? 8000 : (Math.min(tmpIndexReadCnt, 13500))); memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2); fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3); fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10); @@ -250,8 +250,7 @@ public class MessageStore implements Closeable { } } // before read from file, adjust request's offset. - long reqNewOffset = requestOffset < this.msgFileStore.getIndexMinOffset() - ? this.msgFileStore.getIndexMinOffset() : requestOffset; + long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset()); if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) { return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND, reqNewOffset, 0, "current offset is exceed max file offset"); @@ -409,9 +408,9 @@ public class MessageStore implements Closeable { maxFileValidDurMs.set(parseDeletePolicy(topicMetadata.getDeletePolicy())); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000 - ? 6000 : (tmpIndexReadCnt >= 10000 ? 10000 : tmpIndexReadCnt)); + ? 6000 : (Math.min(tmpIndexReadCnt, 10000))); fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000 - ? 8000 : (tmpIndexReadCnt >= 13500 ? 13500 : tmpIndexReadCnt)); + ? 8000 : (Math.min(tmpIndexReadCnt, 13500))); memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2); fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3); fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java index 6275f3a..2d55ba1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -51,6 +52,7 @@ import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult; import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; import org.apache.tubemq.server.broker.utils.DataStoreUtils; +import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo; import org.apache.tubemq.server.common.TStatusConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,8 +92,7 @@ public class MessageStoreManager implements StoreService { this.metadataManager = this.tubeBroker.getMetadataManager(); this.isRemovingTopic.set(false); this.maxMsgTransferSize = - tubeConfig.getTransferSize() > DataStoreUtils.MAX_MSG_TRANSFER_SIZE - ? DataStoreUtils.MAX_MSG_TRANSFER_SIZE : tubeConfig.getTransferSize(); + Math.min(tubeConfig.getTransferSize(), DataStoreUtils.MAX_MSG_TRANSFER_SIZE); this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() { @Override public void propertyChange(final PropertyChangeEvent evt) { @@ -385,6 +386,63 @@ public class MessageStoreManager implements StoreService { return Collections.unmodifiableMap(this.dataStores); } + /*** + * Query topic's publish info. + * + * @param topicSet query's topic set + * + * @return the topic's offset info + */ + @Override + public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos( + Set<String> topicSet) { + MessageStore store = null; + TopicMetadata topicMetadata = null; + Set<String> qryTopicSet = new HashSet<>(); + Map<String, Map<Integer, TopicPubStoreInfo>> topicPubStoreInfoMap = new HashMap<>(); + Map<String, TopicMetadata> confTopicInfo = metadataManager.getTopicConfigMap(); + if (topicSet == null || topicSet.isEmpty()) { + qryTopicSet.addAll(confTopicInfo.keySet()); + } else { + for (String topic : topicSet) { + if (confTopicInfo.containsKey(topic)) { + qryTopicSet.add(topic); + } + } + } + if (qryTopicSet.isEmpty()) { + return topicPubStoreInfoMap; + } + for (String topic : qryTopicSet) { + topicMetadata = confTopicInfo.get(topic); + if (topicMetadata == null) { + continue; + } + Map<Integer, MessageStore> storeMap = dataStores.get(topic); + if (storeMap == null) { + continue; + } + Map<Integer, TopicPubStoreInfo> storeInfoMap = new HashMap<>(); + for (Map.Entry<Integer, MessageStore> entry : storeMap.entrySet()) { + if (entry == null + || entry.getKey() == null + || entry.getValue() == null) { + continue; + } + store = entry.getValue(); + for (Integer partitionId : topicMetadata.getPartIdsByStoreId(entry.getKey())) { + TopicPubStoreInfo storeInfo = + new TopicPubStoreInfo(topic, entry.getKey(), partitionId, + store.getIndexMinOffset(), store.getIndexMaxOffset(), + store.getDataMinOffset(), store.getDataMaxOffset()); + storeInfoMap.put(partitionId, storeInfo); + } + } + topicPubStoreInfoMap.put(topic, storeInfoMap); + } + return topicPubStoreInfoMap; + } + private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException { TopicMetadata topicMetadata = null; final Set<String> paths = new HashSet<>(); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java index d5f7f32..184fb4d 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/StoreService.java @@ -20,6 +20,10 @@ package org.apache.tubemq.server.broker.msgstore; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo; + /*** * Store service interface. @@ -35,4 +39,6 @@ public interface StoreService { MessageStore getOrCreateMessageStore(final String topic, final int partition) throws Throwable; + + Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(Set<String> topicSet); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index bdd85b3..df3afc4 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.daemon.AbstractDaemonService; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.broker.BrokerConfig; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; @@ -399,22 +400,26 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse * @return group offset info in memory or zk */ @Override - public Map<String, Map<Integer, Long>> queryGroupOffset( + public Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset( String group, Map<String, Set<Integer>> topicPartMap) { - Map<String, Map<Integer, Long>> result = new HashMap<>(); + Map<String, Map<Integer, Tuple2<Long, Long>>> result = new HashMap<>(); // search group from memory Map<String, OffsetStorageInfo> topicPartOffsetMap = cfmOffsetMap.get(group); if (topicPartOffsetMap == null) { // query from zookeeper for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + if (entry == null || entry.getKey() == null || entry.getValue() == null) { + continue; + } Map<Integer, Long> qryResult = - zkOffsetStorage.queryGroupOffsetInfo( - group, entry.getKey(), entry.getValue()); - Map<Integer, Long> offsetMap = new HashMap<>(); + zkOffsetStorage.queryGroupOffsetInfo(group, + entry.getKey(), entry.getValue()); + Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>(); for (Map.Entry<Integer, Long> item : qryResult.entrySet()) { - if (item.getValue() != null) { - offsetMap.put(item.getKey(), item.getValue()); + if (item == null || item.getKey() == null || item.getValue() == null) { + continue; } + offsetMap.put(item.getKey(), new Tuple2<>(item.getValue(), 0L)); } if (!offsetMap.isEmpty()) { result.put(entry.getKey(), offsetMap); @@ -422,14 +427,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse } } else { // found in memory, get offset values + Map<String, Long> tmpPartOffsetMap = tmpOffsetMap.get(group); for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { - Map<Integer, Long> offsetMap = new HashMap<>(); + Map<Integer, Tuple2<Long, Long>> offsetMap = new HashMap<>(); for (Integer partitionId : entry.getValue()) { String offsetCacheKey = getOffsetCacheKey(entry.getKey(), partitionId); OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey); + Long tmpOffset = tmpPartOffsetMap.get(offsetCacheKey); if (offsetInfo != null) { - offsetMap.put(partitionId, offsetInfo.getOffset()); + offsetMap.put(partitionId, + new Tuple2<>(offsetInfo.getOffset(), + (tmpOffset == null ? 0 : tmpOffset))); } } if (!offsetMap.isEmpty()) { @@ -451,9 +460,9 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse * @return at least one record modified */ @Override - public boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups, - Map<String, Map<Integer, Long>> topicPartOffsetMap, - String modifier) { + public boolean modifyGroupOffset( + MessageStoreManager storeManager, Set<String> groups, + Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier) { long oldOffset = -1; long reSetOffset = -1; boolean changed = false; @@ -461,17 +470,18 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse StringBuilder strBuidler = new StringBuilder(512); // set offset by group for (String group : groups) { - for (Map.Entry<String, Map<Integer, Long>> entry : topicPartOffsetMap.entrySet()) { - Map<Integer, Long> partOffsetMap = entry.getValue(); + for (Map.Entry<String, Map<Integer, Tuple2<Long, Long>>> entry + : topicPartOffsetMap.entrySet()) { + Map<Integer, Tuple2<Long, Long>> partOffsetMap = entry.getValue(); if (partOffsetMap == null) { continue; } // set offset - for (Map.Entry<Integer, Long> entry1 : partOffsetMap.entrySet()) { + for (Map.Entry<Integer, Tuple2<Long, Long>> entry1 : partOffsetMap.entrySet()) { if (entry1.getValue() == null) { continue; } - reSetOffset = entry1.getValue(); + Tuple2<Long, Long> offsetTuple = entry1.getValue(); // get topic store try { store = storeManager.getOrCreateMessageStore( @@ -485,8 +495,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse long firstOffset = store.getIndexMinOffset(); long lastOffset = store.getIndexMaxOffset(); // adjust reseted offset value - reSetOffset = reSetOffset < firstOffset - ? firstOffset : Math.min(reSetOffset, lastOffset); + reSetOffset = offsetTuple.f0 < firstOffset + ? firstOffset : Math.min(offsetTuple.f0, lastOffset); String offsetCacheKey = getOffsetCacheKey(entry.getKey(), entry1.getKey()); getAndResetTmpOffset(group, offsetCacheKey); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java index 05f0724..fcebdfc 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/OffsetService.java @@ -19,6 +19,8 @@ package org.apache.tubemq.server.broker.offset; import java.util.Map; import java.util.Set; + +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo; @@ -63,10 +65,10 @@ public interface OffsetService { Set<String> getGroupSubInfo(String group); - Map<String, Map<Integer, Long>> queryGroupOffset( + Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset( String group, Map<String, Set<Integer>> topicPartMap); boolean modifyGroupOffset(MessageStoreManager storeManager, Set<String> groups, - Map<String, Map<Integer, Long>> topicPartOffsetMap, + Map<String, Map<Integer, Tuple2<Long, Long>>> topicPartOffsetMap, String modifier); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java new file mode 100644 index 0000000..9a4abe3 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.broker.utils; + +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.utils.Tuple2; + + +public class GroupOffsetInfo { + public int partitionId = TBaseConstants.META_VALUE_UNDEFINED; + public long offsetMin = TBaseConstants.META_VALUE_UNDEFINED; + public long offsetMax = TBaseConstants.META_VALUE_UNDEFINED; + public long dataMin = TBaseConstants.META_VALUE_UNDEFINED; + public long dataMax = TBaseConstants.META_VALUE_UNDEFINED; + public long curOffset = TBaseConstants.META_VALUE_UNDEFINED; + public long flightOffset = TBaseConstants.META_VALUE_UNDEFINED; + public long offsetLag = TBaseConstants.META_VALUE_UNDEFINED; + public long curDataOffset = TBaseConstants.META_VALUE_UNDEFINED; + public long dataLag = TBaseConstants.META_VALUE_UNDEFINED; + + public GroupOffsetInfo(int partitionId) { + this.partitionId = partitionId; + } + + public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) { + if (pubStoreInfo != null) { + this.offsetMin = pubStoreInfo.indexStart; + this.offsetMax = pubStoreInfo.indexEnd; + this.dataMin = pubStoreInfo.dataStart; + this.dataMax = pubStoreInfo.dataEnd; + } + } + + public void setConsumeOffsetInfo(Tuple2<Long, Long> offsetInfo) { + if (offsetInfo != null) { + this.curOffset = offsetInfo.f0; + this.flightOffset = offsetInfo.f1; + } + } + + public void setConsumeDataOffsetInfo(long curDataOffset) { + if (curDataOffset >= 0) { + this.curDataOffset = curDataOffset; + } + } + + public void calculateLag() { + if (offsetMax != TBaseConstants.META_VALUE_UNDEFINED + && curOffset != TBaseConstants.META_VALUE_UNDEFINED) { + offsetLag = offsetMax - curOffset; + } + if (dataMax != TBaseConstants.META_VALUE_UNDEFINED + && curDataOffset != TBaseConstants.META_VALUE_UNDEFINED) { + dataLag = dataMax - curDataOffset; + } + } + + public StringBuilder buildOffsetInfo(StringBuilder sBuilder) { + sBuilder.append("{\"partitionId\":").append(partitionId) + .append(",\"curOffset\":").append(curOffset) + .append(",\"flightOffset\":").append(flightOffset) + .append(",\"curDataOffset\":").append(curDataOffset) + .append(",\"offsetLag\":").append(offsetLag) + .append(",\"dataLag\":").append(dataLag) + .append(",\"offsetMax\":").append(offsetMax) + .append(",\"dataMax\":").append(dataMax) + .append("}"); + return sBuilder; + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java new file mode 100644 index 0000000..b2257dd --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.broker.utils; + +import org.apache.tubemq.corebase.TBaseConstants; + + + +public class TopicPubStoreInfo { + + public String topicName = null; + public int storeId = TBaseConstants.META_VALUE_UNDEFINED; + public int partitionId = TBaseConstants.META_VALUE_UNDEFINED; + public long indexStart = 0L; + public long indexEnd = 0L; + public long dataStart = 0L; + public long dataEnd = 0L; + + public TopicPubStoreInfo(String topicName, int storeId, int partitionId, + long indexStart, long indexEnd, long dataStart, long dataEnd) { + this.topicName = topicName; + this.storeId = storeId; + this.partitionId = partitionId; + this.indexStart = indexStart; + this.indexEnd = indexEnd; + this.dataStart = dataStart; + this.dataEnd = dataEnd; + } + +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index 91bfd23..9a0506e 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -27,12 +27,15 @@ import java.util.concurrent.ConcurrentHashMap; import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.broker.TubeBroker; import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; import org.apache.tubemq.server.broker.offset.OffsetService; +import org.apache.tubemq.server.broker.utils.GroupOffsetInfo; +import org.apache.tubemq.server.broker.utils.TopicPubStoreInfo; import org.apache.tubemq.server.common.fielddef.WebFieldDef; import org.apache.tubemq.server.common.utils.ProcessResult; import org.apache.tubemq.server.common.utils.WebParameterUtils; @@ -672,42 +675,34 @@ public class BrokerAdminServlet extends AbstractWebHandler { Set<String> topicSet = (Set<String>) result.retData1; // verify the acquired Topic set and // query the corresponding offset information - Map<String, Map<String, Map<Integer, Long>>> groupOffsetMaps = new HashMap<>(); - for (String group : qryGroupNameSet) { - Map<String, Set<Integer>> topicPartMap = - validAndGetPartitions(group, topicSet); - Map<String, Map<Integer, Long>> groupOffsetMap = - broker.getOffsetManager().queryGroupOffset(group, topicPartMap); - groupOffsetMaps.put(group, groupOffsetMap); - } + Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = + getGroupOffsetInfo(qryGroupNameSet, topicSet); // builder result int totalCnt = 0; sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); - for (Map.Entry<String, Map<String, Map<Integer, Long>>> entry + for (Map.Entry<String, Map<String, Map<Integer, GroupOffsetInfo>>> entry : groupOffsetMaps.entrySet()) { if (totalCnt++ > 0) { sBuilder.append(","); } - Map<String, Map<Integer, Long>> topicPartMap = entry.getValue(); + Map<String, Map<Integer, GroupOffsetInfo>> topicPartMap = entry.getValue(); sBuilder.append("{\"groupName\":\"").append(entry.getKey()) .append("\",\"subInfo\":["); int topicCnt = 0; - for (Map.Entry<String, Map<Integer, Long>> entry1 : topicPartMap.entrySet()) { + for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : topicPartMap.entrySet()) { if (topicCnt++ > 0) { sBuilder.append(","); } - Map<Integer, Long> partOffMap = entry1.getValue(); + Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue(); sBuilder.append("{\"topicName\":\"").append(entry1.getKey()) .append("\",\"offsets\":["); int partCnt = 0; - for (Map.Entry<Integer, Long> entry2 : partOffMap.entrySet()) { + for (Map.Entry<Integer, GroupOffsetInfo> entry2 : partOffMap.entrySet()) { if (partCnt++ > 0) { sBuilder.append(","); } - sBuilder.append("{\"").append(this.broker.getTubeConfig().getBrokerId()) - .append(TokenConstants.ATTR_SEP).append(entry1.getKey()) - .append(TokenConstants.ATTR_SEP).append(entry2.getKey()) - .append("\":").append(entry2.getValue()).append("}"); + GroupOffsetInfo offsetInfo = entry2.getValue(); + offsetInfo.buildOffsetInfo(sBuilder); } sBuilder.append("],\"partCount\":").append(partCnt).append("}"); } @@ -777,7 +772,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { return; } // query offset from source group - Map<String, Map<Integer, Long>> srcGroupOffsets = + Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets = broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap); boolean changed = broker.getOffsetManager().modifyGroupOffset( broker.getStoreManager(), tgtGroupNameSet, srcGroupOffsets, modifier); @@ -785,6 +780,48 @@ public class BrokerAdminServlet extends AbstractWebHandler { sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } + // builder group's offset info + private Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> getGroupOffsetInfo( + Set<String> groupSet, Set<String> topicSet) { + long curReadDataOffset = -2; + long curDataLag = -2; + Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> groupOffsetMaps = new HashMap<>(); + for (String group : groupSet) { + Map<String, Map<Integer, GroupOffsetInfo>> topicOffsetRet = new HashMap<>(); + // valid and get topic's partitionIds + Map<String, Set<Integer>> topicPartMap = validAndGetPartitions(group, topicSet); + // get topic's publish info + Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap = + broker.getStoreManager().getTopicPublishInfos(topicPartMap.keySet()); + // get group's booked offset info + Map<String, Map<Integer, Tuple2<Long, Long>>> groupOffsetMap = + broker.getOffsetManager().queryGroupOffset(group, topicPartMap); + // get offset info array + for (Map.Entry<String, Set<Integer>> entry : topicPartMap.entrySet()) { + String topic = entry.getKey(); + Map<Integer, GroupOffsetInfo> partOffsetRet = new HashMap<>(); + Map<Integer, TopicPubStoreInfo> storeInfoMap = topicStorePubInfoMap.get(topic); + Map<Integer, Tuple2<Long, Long>> partBookedMap = groupOffsetMap.get(topic); + for (Integer partitionId : entry.getValue()) { + GroupOffsetInfo offsetInfo = new GroupOffsetInfo(partitionId); + offsetInfo.setPartPubStoreInfo(storeInfoMap.get(partitionId)); + offsetInfo.setConsumeOffsetInfo(partBookedMap.get(partitionId)); + String queryKey = buildQueryID(group, topic, partitionId); + ConsumerNodeInfo nodeInfo = broker.getConsumerNodeInfo(queryKey); + if (nodeInfo != null) { + offsetInfo.setConsumeDataOffsetInfo(nodeInfo.getLastDataRdOffset()); + } + offsetInfo.calculateLag(); + partOffsetRet.put(partitionId, offsetInfo); + } + topicOffsetRet.put(topic, partOffsetRet); + } + groupOffsetMaps.put(group, topicOffsetRet); + } + return groupOffsetMaps; + } + + private Map<String, Set<Integer>> validAndGetPartitions(String group, Set<String> topicSet) { Map<String, Set<Integer>> topicPartMap = new HashMap<>(); // query stored topic set stored in memory or zk @@ -807,4 +844,10 @@ public class BrokerAdminServlet extends AbstractWebHandler { return topicPartMap; } + private String buildQueryID(String group, String topic, int partitionId) { + return new StringBuilder(512).append(group) + .append(TokenConstants.ATTR_SEP).append(topic) + .append(TokenConstants.ATTR_SEP).append(partitionId).toString(); + } + }
