This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-469 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit f25db3019f374189b5012534843d5166d9733258 Author: gosonzhang <[email protected]> AuthorDate: Sat Jan 16 17:47:22 2021 +0800 [TUBEMQ-515]Add cluster Topic view web api --- resources/assets/scripts/topicList.js | 8 +- .../server/common/utils/WebParameterUtils.java | 19 ++++ .../web/handler/WebBrokerTopicConfHandler.java | 33 ++---- .../master/web/handler/WebMasterInfoHandler.java | 115 ++++++++++++++++++++- 4 files changed, 146 insertions(+), 29 deletions(-) diff --git a/resources/assets/scripts/topicList.js b/resources/assets/scripts/topicList.js index b755a94..7dc000a 100644 --- a/resources/assets/scripts/topicList.js +++ b/resources/assets/scripts/topicList.js @@ -102,7 +102,7 @@ 'false': '否', '-': '-' }; - var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_topic_info&" + $.param( + var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_cluster_topic_view&" + $.param( opts); if (!this.$topicListDataTable) { @@ -126,7 +126,7 @@ return html; } }, { - "data": "infoCount" + "data": "totalCfgBrokerCnt" }, { "data": "totalCfgNumPart" }, { @@ -166,13 +166,13 @@ + '"><input type="checkbox" checked></span>'; } }, { - "data": "authData", + "data": "enableAuthControl", "orderable": false, "render": function (data, type, full, meta) { - var checked = data.enableAuthControl + var checked = data === true ? ' checked' : ''; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index 27e61e1..161966d 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -34,6 +34,7 @@ import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; @@ -1401,6 +1402,24 @@ public class WebParameterUtils { return strManageStatus; } + public static Tuple2<Boolean, Boolean> getPubSubStatusByManageStatus(int manageStatus) { + boolean isAcceptPublish = false; + boolean isAcceptSubscribe = false; + if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) { + if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) { + isAcceptPublish = true; + isAcceptSubscribe = true; + } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) { + isAcceptPublish = false; + isAcceptSubscribe = true; + } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) { + isAcceptPublish = true; + isAcceptSubscribe = false; + } + } + return new Tuple2<>(isAcceptPublish, isAcceptSubscribe); + } + public static String date2yyyyMMddHHmmss(Date date) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); return sdf.format(date); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java index 9a4cf04..74a4ed0 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java @@ -33,6 +33,7 @@ import org.apache.tubemq.corebase.cluster.BrokerInfo; import org.apache.tubemq.corebase.cluster.TopicInfo; import org.apache.tubemq.corebase.utils.SettingValidUtils; import org.apache.tubemq.corebase.utils.TStringUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; import org.apache.tubemq.server.common.fielddef.WebFieldDef; @@ -628,18 +629,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { if (brokerConfEntity != null) { int manageStatus = brokerConfEntity.getManageStatus(); strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus); - if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) { - if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) { - isAcceptPublish = true; - isAcceptSubscribe = true; - } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) { - isAcceptPublish = false; - isAcceptSubscribe = true; - } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) { - isAcceptPublish = true; - isAcceptSubscribe = false; - } - } + Tuple2<Boolean, Boolean> pubSubStatus = + WebParameterUtils.getPubSubStatusByManageStatus(manageStatus); + isAcceptPublish = pubSubStatus.getF0(); + isAcceptSubscribe = pubSubStatus.getF1(); } BrokerInfo broker = new BrokerInfo(entity.getBrokerId(), entity.getBrokerIp(), entity.getBrokerPort()); @@ -1243,18 +1236,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { if (brokerConfEntity != null) { int manageStatus = brokerConfEntity.getManageStatus(); strManageStatus = WebParameterUtils.getBrokerManageStatusStr(manageStatus); - if (manageStatus >= TStatusConstants.STATUS_MANAGE_ONLINE) { - if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE) { - isAcceptPublish = true; - isAcceptSubscribe = true; - } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE) { - isAcceptPublish = false; - isAcceptSubscribe = true; - } else if (manageStatus == TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ) { - isAcceptPublish = true; - isAcceptSubscribe = false; - } - } + Tuple2<Boolean, Boolean> pubSubStatus = + WebParameterUtils.getPubSubStatusByManageStatus(manageStatus); + isAcceptPublish = pubSubStatus.getF0(); + isAcceptSubscribe = pubSubStatus.getF1(); } BrokerSyncStatusInfo brokerSyncStatusInfo = brokerSyncStatusInfoMap.get(brokerEntity.getBrokerId()); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java index f2fece8..0e58fbb 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java @@ -19,14 +19,25 @@ package org.apache.tubemq.server.master.web.handler; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.cluster.BrokerInfo; +import org.apache.tubemq.corebase.cluster.TopicInfo; import org.apache.tubemq.corebase.utils.SettingValidUtils; +import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.server.common.fielddef.WebFieldDef; import org.apache.tubemq.server.common.utils.ProcessResult; import org.apache.tubemq.server.common.utils.WebParameterUtils; import org.apache.tubemq.server.master.TMaster; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity; +import org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager; import org.apache.tubemq.server.master.web.model.ClusterGroupVO; import org.apache.tubemq.server.master.web.model.ClusterNodeVO; @@ -51,6 +62,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler { "getGroupAddressStrInfo"); registerQueryWebMethod("admin_query_cluster_default_setting", "adminQueryClusterDefSetting"); + registerQueryWebMethod("admin_query_cluster_topic_view", + "adminQueryClusterTopicView"); + // register modify method registerModifyWebMethod("admin_transfer_current_master", "transferCurrentMaster"); @@ -210,7 +224,106 @@ public class WebMasterInfoHandler extends AbstractWebHandler { return sBuilder; } - + /** + * Query cluster topic overall view + * + * @param req + * @return + */ + public StringBuilder adminQueryClusterTopicView(HttpServletRequest req) { + ProcessResult result = new ProcessResult(); + StringBuilder sBuilder = new StringBuilder(512); + // check and get brokerId field + if (!WebParameterUtils.getIntParamValue(req, + WebFieldDef.COMPSBROKERID, false, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return sBuilder; + } + Set<Integer> brokerIds = (Set<Integer>) result.retData1; + // check and get topicName field + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null, result)) { + WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + return sBuilder; + } + Set<String> topicNameSet = (Set<String>) result.retData1; + // query topic configure info + ConcurrentHashMap<String, List<BdbTopicConfEntity>> topicConfigMap = + brokerConfManager.getBdbTopicEntityMap(null); + TopicPSInfoManager topicPSInfoManager = master.getTopicPSInfoManager(); + int totalCount = 0; + int brokerCount = 0; + int totalCfgNumPartCount = 0; + int totalRunNumPartCount = 0; + boolean isSrvAcceptPublish = false; + boolean isSrvAcceptSubscribe = false; + boolean isAcceptPublish = false; + boolean isAcceptSubscribe = false; + boolean enableAuthControl = false; + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":["); + for (Map.Entry<String, List<BdbTopicConfEntity>> entry : topicConfigMap.entrySet()) { + if (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey())) { + continue; + } + if (totalCount++ > 0) { + sBuilder.append(","); + } + brokerCount = 0; + totalCfgNumPartCount = 0; + totalRunNumPartCount = 0; + isSrvAcceptPublish = false; + isSrvAcceptSubscribe = false; + enableAuthControl = false; + isAcceptPublish = false; + isAcceptSubscribe = false; + for (BdbTopicConfEntity entity : entry.getValue()) { + if ((!brokerIds.isEmpty()) && (!brokerIds.contains(entity.getBrokerId()))) { + continue; + } + brokerCount++; + totalCfgNumPartCount += entity.getNumPartitions() * entity.getNumTopicStores(); + BdbBrokerConfEntity brokerConfEntity = + brokerConfManager.getBrokerDefaultConfigStoreInfo(entity.getBrokerId()); + if (brokerConfEntity != null) { + Tuple2<Boolean, Boolean> pubSubStatus = + WebParameterUtils.getPubSubStatusByManageStatus( + brokerConfEntity.getManageStatus()); + isAcceptPublish = pubSubStatus.getF0(); + isAcceptSubscribe = pubSubStatus.getF1(); + } + BrokerInfo broker = + new BrokerInfo(entity.getBrokerId(), + entity.getBrokerIp(), entity.getBrokerPort()); + TopicInfo topicInfo = topicPSInfoManager.getTopicInfo( + entity.getTopicName(), broker); + if (topicInfo != null) { + if (isAcceptPublish && topicInfo.isAcceptPublish()) { + isSrvAcceptPublish = true; + } + if (isAcceptSubscribe && topicInfo.isAcceptSubscribe()) { + isSrvAcceptSubscribe = true; + } + totalRunNumPartCount += + topicInfo.getPartitionNum() * topicInfo.getTopicStoreNum(); + } + } + BdbTopicAuthControlEntity authEntity = + brokerConfManager.getBdbEnableAuthControlByTopicName(entry.getKey()); + if (authEntity != null) { + enableAuthControl = authEntity.isEnableAuthControl(); + } + sBuilder.append("{\"topicName\":\"").append(entry.getKey()) + .append("\",\"totalCfgBrokerCnt\":").append(brokerCount) + .append(",\"totalCfgNumPart\":").append(totalCfgNumPartCount) + .append(",\"totalRunNumPartCount\":").append(totalRunNumPartCount) + .append(",\"isSrvAcceptPublish\":").append(isSrvAcceptPublish) + .append(",\"isSrvAcceptSubscribe\":").append(isSrvAcceptSubscribe) + .append(",\"enableAuthControl\":").append(enableAuthControl) + .append("}"); + } + sBuilder.append("],\"dataCount\":").append(totalCount).append("}"); + return sBuilder; + } }
