This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 8e0b01de8aee18f7d4899fc772a4d809a915f9fa Author: gosonzhang <[email protected]> AuthorDate: Tue Dec 22 18:59:39 2020 +0800 [TUBEMQ-470] Add query API of TopicName and BrokerId collection --- .../tubemq/server/common/fielddef/WebFieldDef.java | 10 ++- .../server/common/utils/WebParameterUtils.java | 53 ++++++++++- .../nodemanage/nodebroker/BrokerConfManager.java | 69 ++++++++++++++ .../web/handler/WebBrokerTopicConfHandler.java | 100 +++++++++++++++++++++ 4 files changed, 226 insertions(+), 6 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java index 44a6f81..f73959e 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -64,9 +64,13 @@ public enum WebFieldDef { COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT, "Partition id", RegexDef.TMP_NUMBER), CALLERIP(13, "callerIp", "cip", WebFieldType.STRING, - "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH); - - + "Caller ip address", TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH), + BROKERID(14, "brokerId", "brokerId", WebFieldType.INT, + "Broker ID", RegexDef.TMP_NUMBER), + COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT, + "Broker ID", RegexDef.TMP_NUMBER), + WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN, + "Require return ip information, default is false"); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index b7d5d41..1202d33 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -283,6 +283,51 @@ public class WebParameterUtils { * @param req Http Servlet Request * @param fieldDef the parameter field definition * @param required a boolean value represent whether the parameter is must required + * @return valid result for the parameter value + */ + public static ProcessResult getIntParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required) { + ProcessResult procResult = + getStringParamValue(req, fieldDef, required, null); + if (!procResult.success) { + return procResult; + } + ProcessResult procRet = new ProcessResult(); + Set<Integer> tgtValueSet = new HashSet<Integer>(); + if (fieldDef.isCompFieldType()) { + Set<String> valItemSet = (Set<String>) procResult.retData1; + if (valItemSet.isEmpty()) { + procResult.setSuccResult(tgtValueSet); + return procResult; + } + for (String itemVal : valItemSet) { + if (!checkIntValueNorms(procRet, fieldDef, itemVal, false, -1)) { + return procRet; + } + tgtValueSet.add((Integer) procRet.retData1); + } + } else { + String paramValue = (String) procResult.retData1; + if (paramValue == null) { + procResult.setSuccResult(tgtValueSet); + return procResult; + } + if (!checkIntValueNorms(procRet, + fieldDef, paramValue, false, -1)) { + tgtValueSet.add((Integer) procRet.retData1); + } + } + procResult.setSuccResult(tgtValueSet); + return procResult; + } + + /** + * Parse the parameter value from an object value to a integer value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required * @param defValue a default value returned if failed to parse value from the given object * @param minValue min value required * @return valid result for the parameter value @@ -307,7 +352,7 @@ public class WebParameterUtils { } ProcessResult procRet = new ProcessResult(); for (String itemVal : valItemSet) { - if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) { + if (!checkIntValueNorms(procRet, fieldDef, itemVal, true, minValue)) { return procRet; } tgtValueSet.add((Integer) procRet.retData1); @@ -319,7 +364,7 @@ public class WebParameterUtils { procResult.setSuccResult(defValue); return procResult; } - checkIntValueNorms(procResult, fieldDef, paramValue, minValue); + checkIntValueNorms(procResult, fieldDef, paramValue, true, minValue); } return procResult; } @@ -501,16 +546,18 @@ public class WebParameterUtils { * @param procResult process result * @param fieldDef the parameter field definition * @param paramValue the parameter value + * @param hasMinVal whether there is a minimum * param minValue the parameter min value * @return check result for string value of parameter */ private static boolean checkIntValueNorms(ProcessResult procResult, WebFieldDef fieldDef, String paramValue, + boolean hasMinVal, int minValue) { try { int paramIntVal = Integer.parseInt(paramValue); - if (paramIntVal < minValue) { + if (hasMinVal && paramIntVal < minValue) { procResult.setFailResult(400, new StringBuilder(512).append("Parameter ") .append(fieldDef.name).append(" value must >= ") diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java index 61e1fe3..698f0d3 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java @@ -721,6 +721,75 @@ public class BrokerConfManager implements Server { return true; } + /** + * get broker's topicName set, + * if brokerIds is empty, then return all broker's topicNames + * + * @param brokerIdSet + * @return broker's topicName set + */ + public Map<Integer, Set<String>> getBrokerTopicConfigInfo(Set<Integer> brokerIdSet) { + Map<Integer, Set<String>> result = new HashMap<>(); + if (brokerIdSet.isEmpty()) { + for (ConcurrentHashMap.Entry<Integer, + ConcurrentHashMap<String, BdbTopicConfEntity>> + entry : brokerTopicEntityStoreMap.entrySet()) { + Set<String> topicSet = new HashSet<>(); + if (entry.getValue() != null) { + topicSet.addAll(entry.getValue().keySet()); + } + result.put(entry.getKey(), topicSet); + } + } else { + for (Integer brokerId : brokerIdSet) { + ConcurrentHashMap<String, BdbTopicConfEntity> topicConfigMap = + brokerTopicEntityStoreMap.get(brokerId); + Set<String> topicSet = new HashSet<>(); + if (topicConfigMap != null && !topicConfigMap.isEmpty()) { + topicSet.addAll(topicConfigMap.keySet()); + } + result.put(brokerId, topicSet); + } + } + return result; + } + + /** + * get topic's brokerId set, + * if topicSet is empty, then return all topic's brokerIds + * + * @param topicNameSet + * @return topic's brokerId set + */ + public Map<String, Map<Integer, String>> getTopicBrokerConfigInfo(Set<String> topicNameSet) { + Map<String, Map<Integer, String>> result = new HashMap<>(); + if (topicNameSet.isEmpty()) { + for (Map<String, BdbTopicConfEntity> topicConfigMap + : brokerTopicEntityStoreMap.values()) { + for (Map.Entry<String, BdbTopicConfEntity> entry + : topicConfigMap.entrySet()) { + Map<Integer, String> brokerInfos = + result.computeIfAbsent(entry.getKey(), k -> new HashMap<>()); + brokerInfos.put(entry.getValue().getBrokerId(), + entry.getValue().getBrokerIp()); + } + } + } else { + for (Map<String, BdbTopicConfEntity> topicConfigMap + : brokerTopicEntityStoreMap.values()) { + for (String topic : topicNameSet) { + Map<Integer, String> brokerInfos = + result.computeIfAbsent(topic, k -> new HashMap<>()); + BdbTopicConfEntity topicConfig = topicConfigMap.get(topic); + if (topicConfig != null) { + brokerInfos.put(topicConfig.getBrokerId(), topicConfig.getBrokerIp()); + } + } + } + } + return result; + } + public Set<String> getTotalConfiguredTopicNames() { Set<String> totalTopics = new HashSet<>(50); for (ConcurrentHashMap<String, BdbTopicConfEntity> tmpTopicCfgMap diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java index bc14775..3bdfe16 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java @@ -33,6 +33,8 @@ import org.apache.tubemq.corebase.cluster.TopicInfo; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; +import org.apache.tubemq.server.common.fielddef.WebFieldDef; +import org.apache.tubemq.server.common.utils.ProcessResult; import org.apache.tubemq.server.common.utils.WebParameterUtils; import org.apache.tubemq.server.master.TMaster; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; @@ -66,6 +68,10 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { "adminQueryTopicCfgEntityAndRunInfo"); registerQueryWebMethod("admin_query_broker_topic_config_info", "adminQueryBrokerTopicCfgAndRunInfo"); + registerQueryWebMethod("admin_query_topicName", + "adminQuerySimpleTopicName"); + registerQueryWebMethod("admin_query_brokerId", + "adminQuerySimpleBrokerId"); // register modify method registerModifyWebMethod("admin_add_new_topic_record", "adminAddTopicEntityInfo"); @@ -702,6 +708,100 @@ public class WebBrokerTopicConfHandler extends AbstractWebHandler { } /** + * Query broker's topic-name set info + * + * @param req + * @return + */ + public StringBuilder adminQuerySimpleTopicName(HttpServletRequest req) { + StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = WebParameterUtils.getIntParamValue(req, + WebFieldDef.COMPSBROKERID, false); + if (!result.success) { + WebParameterUtils.buildFailResult(strBuffer, result.errInfo); + return strBuffer; + } + Set<Integer> brokerIds = (Set<Integer>) result.retData1; + strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":["); + Map<Integer, Set<String>> brokerTopicConfigMap = + brokerConfManager.getBrokerTopicConfigInfo(brokerIds); + int dataCount = 0; + for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) { + if (dataCount++ > 0) { + strBuffer.append(","); + } + strBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":["); + int topicCnt = 0; + Set<String> topicSet = entry.getValue(); + for (String topic : topicSet) { + if (topicCnt++ > 0) { + strBuffer.append(","); + } + strBuffer.append("\"").append(topic).append("\""); + } + strBuffer.append("],\"topicCount\":").append(topicCnt).append("}"); + } + strBuffer.append("],\"dataCount\":").append(dataCount).append("}"); + return strBuffer; + } + + /** + * Query topic's broker id set + * + * @param req + * @return + */ + public StringBuilder adminQuerySimpleBrokerId(HttpServletRequest req) { + StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + WebParameterUtils.buildFailResult(strBuffer, result.errInfo); + return strBuffer; + } + Set<String> topicNameSet = (Set<String>) result.retData1; + result = WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.WITHIP, false, false); + if (!result.success) { + WebParameterUtils.buildFailResult(strBuffer, result.errInfo); + return strBuffer; + } + boolean withIp = (Boolean) result.retData1; + // return result; + strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":["); + Map<String, Map<Integer, String>> opicBrokerConfigMap = + brokerConfManager.getTopicBrokerConfigInfo(topicNameSet); + int dataCount = 0; + for (Map.Entry<String, Map<Integer, String>> entry : opicBrokerConfigMap.entrySet()) { + if (dataCount++ > 0) { + strBuffer.append(","); + } + strBuffer.append("{\"topicName\":\"").append(entry.getKey()).append("\",\"brokerInfo\":["); + int topicCnt = 0; + Map<Integer, String> brokerMap = entry.getValue(); + if (withIp) { + for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) { + if (topicCnt++ > 0) { + strBuffer.append(","); + } + strBuffer.append("{\"brokerId\":").append(entry1.getKey()) + .append(",\"brokerIp\":\"").append(entry1.getValue()).append("\"}"); + } + } else { + for (Map.Entry<Integer, String> entry1 : brokerMap.entrySet()) { + if (topicCnt++ > 0) { + strBuffer.append(","); + } + strBuffer.append(entry1.getKey()); + } + } + strBuffer.append("],\"brokerCnt\":").append(topicCnt).append("}"); + } + strBuffer.append("],\"dataCount\":").append(dataCount).append("}"); + return strBuffer; + } + + /** * Delete topic info * * @param req
