This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 868b04c4ae18bb800864fba142e4a116826b4a6c Author: gosonzhang <[email protected]> AuthorDate: Fri Dec 25 18:48:58 2020 +0800 [TUBEMQ-484]Add query API for topic publication information --- .../server/broker/utils/GroupOffsetInfo.java | 8 ++-- .../server/broker/utils/TopicPubStoreInfo.java | 28 ++++++++---- .../server/broker/web/BrokerAdminServlet.java | 51 +++++++++++++++++++++- 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java index 9a4abe3..a0c7215 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java @@ -39,10 +39,10 @@ public class GroupOffsetInfo { public void setPartPubStoreInfo(TopicPubStoreInfo pubStoreInfo) { if (pubStoreInfo != null) { - this.offsetMin = pubStoreInfo.indexStart; - this.offsetMax = pubStoreInfo.indexEnd; - this.dataMin = pubStoreInfo.dataStart; - this.dataMax = pubStoreInfo.dataEnd; + this.offsetMin = pubStoreInfo.offsetMin; + this.offsetMax = pubStoreInfo.offsetMax; + this.dataMin = pubStoreInfo.dataMin; + this.dataMax = pubStoreInfo.dataMax; } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java index b2257dd..73b9d69 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/TopicPubStoreInfo.java @@ -26,20 +26,30 @@ public class TopicPubStoreInfo { public String topicName = null; public int storeId = TBaseConstants.META_VALUE_UNDEFINED; public int partitionId = TBaseConstants.META_VALUE_UNDEFINED; - public long indexStart = 0L; - public long indexEnd = 0L; - public long dataStart = 0L; - public long dataEnd = 0L; + public long offsetMin = 0L; + public long offsetMax = 0L; + public long dataMin = 0L; + public long dataMax = 0L; public TopicPubStoreInfo(String topicName, int storeId, int partitionId, - long indexStart, long indexEnd, long dataStart, long dataEnd) { + long offsetMin, long offsetMax, long dataMin, long dataMax) { this.topicName = topicName; this.storeId = storeId; this.partitionId = partitionId; - this.indexStart = indexStart; - this.indexEnd = indexEnd; - this.dataStart = dataStart; - this.dataEnd = dataEnd; + this.offsetMin = offsetMin; + this.offsetMax = offsetMax; + this.dataMin = dataMin; + this.dataMax = dataMax; + } + + public StringBuilder buildPubStoreInfo(StringBuilder sBuilder) { + sBuilder.append("{\"partitionId\":").append(partitionId) + .append(",\"offsetMin\":").append(offsetMin) + .append(",\"offsetMax\":").append(offsetMax) + .append(",\"dataMin\":").append(dataMin) + .append(",\"dataMax\":").append(dataMax) + .append("}"); + return sBuilder; } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index 9a0506e..d8f85d4 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -77,6 +77,9 @@ public class BrokerAdminServlet extends AbstractWebHandler { // get all registered methods innRegisterWebMethod("admin_get_methods", "adminQueryAllMethods"); + // query topic's publish info + innRegisterWebMethod("admin_query_pubinfo", + "adminQueryPubInfo"); // Query all consumer groups booked on the Broker. innRegisterWebMethod("admin_query_group", "adminQueryBookedGroup"); @@ -467,7 +470,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { * @throws Exception */ public void adminQueryCurrentGroupOffSet(HttpServletRequest req, - StringBuilder sBuilder) throws Exception { + StringBuilder sBuilder) { ProcessResult result = WebParameterUtils.getStringParamValue(req, WebFieldDef.TOPICNAME, true, null); if (!result.success) { @@ -576,6 +579,50 @@ public class BrokerAdminServlet extends AbstractWebHandler { } /*** + * Query topic's publish info on the Broker. + * + * @param req + * @param sBuilder process result + */ + public void adminQueryPubInfo(HttpServletRequest req, + StringBuilder sBuilder) { + // get the topic set to be queried + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return; + } + // get target consume group name + Set<String> topicSet = (Set<String>) result.retData1; + // get topic's publish info + Map<String, Map<Integer, TopicPubStoreInfo>> topicStorePubInfoMap = + broker.getStoreManager().getTopicPublishInfos(topicSet); + // builder result + int totalCnt = 0; + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); + for (Map.Entry<String, Map<Integer, TopicPubStoreInfo>> entry + : topicStorePubInfoMap.entrySet()) { + if (totalCnt++ > 0) { + sBuilder.append(","); + } + sBuilder.append("{\"topicName\":\"").append(entry.getKey()) + .append("\",\"offsetInfo\":["); + Map<Integer, TopicPubStoreInfo> storeInfoMap = entry.getValue(); + int itemCnt = 0; + for (Map.Entry<Integer, TopicPubStoreInfo> entry1 : storeInfoMap.entrySet()) { + if (itemCnt++ > 0) { + sBuilder.append(","); + } + TopicPubStoreInfo pubStoreInfo = entry1.getValue(); + pubStoreInfo.buildPubStoreInfo(sBuilder); + } + sBuilder.append("],\"itemCount\":").append(itemCnt).append("}"); + } + sBuilder.append("],\"dataCount\":").append(totalCnt).append("}"); + } + + /*** * Query all consumer groups booked on the Broker. * * @param req @@ -583,7 +630,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { */ public void adminQueryBookedGroup(HttpServletRequest req, StringBuilder sBuilder) { - // get group list + // get divide info ProcessResult result = WebParameterUtils.getBooleanParamValue(req, WebFieldDef.WITHDIVIDE, false, false); if (!result.success) {
