This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 9598a42a8675657cdf9190d6c75b1a21b1be70e0 Author: gosonzhang <[email protected]> AuthorDate: Fri Dec 4 10:16:15 2020 +0800 [TUBEMQ-430]Optimizing the implementation of HTTP API for broker (#338) --- .../tubemq/client/config/ConsumerConfig.java | 12 + .../client/consumer/PullMessageConsumer.java | 7 + .../org/apache/tubemq/corebase/utils/RegexDef.java | 60 ++++ .../server/broker/web/AbstractWebHandler.java | 83 +++++ .../server/broker/web/BrokerAdminServlet.java | 333 ++++++++++----------- .../tubemq/server/common/fielddef/CliArgDef.java | 111 +++++++ .../tubemq/server/common/fielddef/WebFieldDef.java | 159 ++++++++++ .../tubemq/server/common/utils/ProcessResult.java | 53 ++++ .../server/common/utils/WebParameterUtils.java | 320 +++++++++++++++++++- .../tubemq/server/common/webbase/WebFieldType.java | 60 ++++ .../webbase/WebMethodMapper.java} | 12 +- .../server/master/web/action/screen/Webapi.java | 6 +- .../master/web/handler/AbstractWebHandler.java | 2 +- 13 files changed, 1037 insertions(+), 181 deletions(-) diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java index d8b63fb..184da79 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java @@ -124,6 +124,18 @@ public class ConsumerConfig extends TubeClientConfig { return pullConsumeReadyWaitPeriodMs; } + // setPullConsumeReadyWaitPeriodMs() use note: + // The value range is [negative value, 0, positive value] and the value directly determines + // the behavior of the PullMessageConsumer.GetMessage() function: + // 1. if it is set to a negative value, it means that the GetMessage() calling thread will + // be blocked forever and will not return until the consumption conditions are met; + // 2. if If it is set to 0, it means that the GetMessage() calling thread will only block + // the ConsumerConfig.getPullConsumeReadyChkSliceMs() interval when the consumption + // conditions are not met and then return; + // 3. if it is set to a positive number, it will not meet the current user usage (including + // unused partitions or allocated partitions, but these partitions do not meet the usage + // conditions), the GetMessage() calling thread will be blocked until the total time of + // ConsumerConfig.getPullConsumeReadyWaitPeriodMs expires public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) { this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs; } diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java index af5d50f..d9c3baf 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java @@ -28,6 +28,13 @@ public interface PullMessageConsumer extends MessageConsumer { PullMessageConsumer subscribe(String topic, TreeSet<String> filterConds) throws TubeClientException; + // getMessage() use note: + // This getMessage have a blocking situation: when the current + // consumer consumption situation is not satisfied (including + // without partitions to consumption, or allocated partitions but + // the partitions do not meet the consumption situation), + // the call will sleep at intervals of ConsumerConfig.getPullConsumeReadyChkSliceMs(), + // until the total time of ConsumerConfig.getPullConsumeReadyWaitPeriodMs ConsumerResult getMessage() throws TubeClientException; ConsumerResult confirmConsume(final String confirmContext, diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java new file mode 100644 index 0000000..842c6a9 --- /dev/null +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/RegexDef.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.corebase.utils; + + + +public enum RegexDef { + + TMP_FILTER(0, "^[_A-Za-z0-9]+$", + "must only contain characters,numbers,and underscores"), + TMP_STRING(1, "^[a-zA-Z]\\w+$", + "must begin with a letter,can only contain characters,numbers,and underscores"), + TMP_NUMBER(2, "^-?[0-9]\\d*$", "must only contain numbers"), + TMP_GROUP(3, "^[a-zA-Z][\\w-]+$", + "must begin with a letter,can only contain characters,numbers,hyphen,and underscores"), + TMP_CONSUMERID(4, "^[_A-Za-z0-9\\.\\-]+$", + "must begin with a letter,can only contain characters,numbers,dot,scores,and underscores"), + TMP_IPV4ADDRESS(5, + "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))", + "must matches the IP V4 address regulation"); + + + private final int id; + private final String pattern; + private final String errMsgTemp; + + + RegexDef(int id, String pattern, String errMsgTemp) { + this.id = id; + this.pattern = pattern; + this.errMsgTemp = errMsgTemp; + } + + public int getId() { + return id; + } + + public String getPattern() { + return pattern; + } + + public String getErrMsgTemp() { + return errMsgTemp; + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java new file mode 100644 index 0000000..b44d88c --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/AbstractWebHandler.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.broker.web; + +import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo; +import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod; +import java.io.IOException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.tubemq.server.broker.TubeBroker; +import org.apache.tubemq.server.common.webbase.WebMethodMapper.WebApiRegInfo; + + + +public abstract class AbstractWebHandler extends HttpServlet { + + protected final TubeBroker broker; + + public AbstractWebHandler(TubeBroker broker) { + this.broker = broker; + } + + @Override + protected void doGet(HttpServletRequest req, + HttpServletResponse resp) throws IOException { + doPost(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, + HttpServletResponse resp) throws IOException { + StringBuilder strBuffer = new StringBuilder(1024); + + try { + String method = req.getParameter("method"); + if (method == null) { + strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") + .append("Please take with method parameter! \"}"); + } else { + WebApiRegInfo webApiRegInfo = getWebApiRegInfo(true, method); + if (webApiRegInfo == null) { + strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") + .append("Unsupported method ").append(method).append("}"); + } else { + strBuffer = (StringBuilder) webApiRegInfo.method.invoke(webApiRegInfo.webHandler, req); + } + } + } catch (Throwable e) { + strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") + .append("Bad request from server: ") + .append(e.getMessage()) + .append("\"}"); + } + resp.getWriter().write(strBuffer.toString()); + resp.setCharacterEncoding(req.getCharacterEncoding()); + resp.setStatus(HttpServletResponse.SC_OK); + resp.flushBuffer(); + } + + public abstract void registerWebApiMethod(); + + protected void innRegisterWebMethod(String webMethodName, + String clsMethodName) { + registerWebMethod(true, webMethodName, clsMethodName, this); + } + +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index e91ae79..ab43c2e 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -17,18 +17,12 @@ package org.apache.tubemq.server.broker.web; -import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.TubeBroker; @@ -36,83 +30,44 @@ import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; import org.apache.tubemq.server.broker.offset.OffsetService; +import org.apache.tubemq.server.common.fielddef.WebFieldDef; +import org.apache.tubemq.server.common.utils.ProcessResult; import org.apache.tubemq.server.common.utils.WebParameterUtils; /*** * Broker's web servlet. Used for admin operation, like query consumer's status etc. */ -public class BrokerAdminServlet extends HttpServlet { - private final TubeBroker broker; +public class BrokerAdminServlet extends AbstractWebHandler { - public BrokerAdminServlet(TubeBroker broker) { - this.broker = broker; - } - @Override - protected void doGet(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - doPost(req, resp); + public BrokerAdminServlet(TubeBroker broker) { + super(broker); + registerWebApiMethod(); } @Override - protected void doPost(HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - StringBuilder sBuilder = new StringBuilder(1024); - try { - String method = req.getParameter("method"); - if ("admin_manual_set_current_offset".equals(method)) { - // manual set offset - sBuilder = this.adminManualSetCurrentOffSet(req); - } else if ("admin_query_group_offset".equals(method)) { - // query consumer group's offset - sBuilder = this.adminQueryCurrentGroupOffSet(req); - } else if ("admin_snapshot_message".equals(method)) { - // query snapshot message - sBuilder = this.adminQuerySnapshotMessageSet(req); - } else if ("admin_query_broker_all_consumer_info".equals(method)) { - // query broker's all consumer info - sBuilder = this.adminQueryBrokerAllConsumerInfo(req); - } else if ("admin_query_broker_memstore_info".equals(method)) { - // get memory store status info - sBuilder = this.adminGetMemStoreStatisInfo(req); - } else if ("admin_query_broker_all_store_info".equals(method)) { - // query broker's all message store info - sBuilder = this.adminQueryBrokerAllMessageStoreInfo(req); - } else if ("admin_query_consumer_regmap".equals(method)) { - Map<String, ConsumerNodeInfo> map = - broker.getBrokerServiceServer().getConsumerRegisterMap(); - int totalCnt = 0; - sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",") - .append(",\"dataSet\":["); - for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) { - if (entry.getKey() == null || entry.getValue() == null) { - continue; - } - if (totalCnt > 0) { - sBuilder.append(","); - } - sBuilder.append("{\"Partition\":\"").append(entry.getKey()) - .append("\",\"Consumer\":\"") - .append(entry.getValue().getConsumerId()) - .append("\",\"index\":").append(++totalCnt).append("}"); - } - sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}"); - } else { - sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") - .append("Invalid request: Unsupported method!") - .append("\"}"); - } - - } catch (Exception e) { - sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") - .append("Bad request from server: ") - .append(e.getMessage()) - .append("\"}"); - } - resp.getWriter().write(sBuilder.toString()); - resp.setCharacterEncoding(req.getCharacterEncoding()); - resp.setStatus(HttpServletResponse.SC_OK); - resp.flushBuffer(); + public void registerWebApiMethod() { + // query consumer group's offset + innRegisterWebMethod("admin_query_group_offset", + "adminQueryCurrentGroupOffSet"); + // query snapshot message + innRegisterWebMethod("admin_snapshot_message", + "adminQuerySnapshotMessageSet"); + // query broker's all consumer info + innRegisterWebMethod("admin_query_broker_all_consumer_info", + "adminQueryBrokerAllConsumerInfo"); + // get memory store status info + innRegisterWebMethod("admin_query_broker_memstore_info", + "adminGetMemStoreStatisInfo"); + // query broker's all message store info + innRegisterWebMethod("admin_query_broker_all_store_info", + "adminQueryBrokerAllMessageStoreInfo"); + // query consumer register info + innRegisterWebMethod("admin_query_consumer_regmap", + "adminQueryConsumerRegisterInfo"); + // manual set offset + innRegisterWebMethod("admin_manual_set_current_offset", + "adminManualSetCurrentOffSet"); } /*** @@ -122,13 +77,16 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception { + public StringBuilder adminQueryBrokerAllConsumerInfo(HttpServletRequest req) throws Exception { int index = 0; StringBuilder sBuilder = new StringBuilder(1024); - String groupNameInput = - WebParameterUtils.validGroupParameter("groupName", - req.getParameter("groupName"), - TBaseConstants.META_MAX_GROUPNAME_LENGTH, false, null); + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSGROUPNAME, false, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + Set<String> groupNameSet = (Set<String>) result.retData1; + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); Map<String, ConsumerNodeInfo> map = broker.getBrokerServiceServer().getConsumerRegisterMap(); @@ -139,7 +97,7 @@ public class BrokerAdminServlet extends HttpServlet { String[] partitionIdArr = entry.getKey().split(TokenConstants.ATTR_SEP); String groupName = partitionIdArr[0]; - if (!TStringUtils.isBlank(groupNameInput) && (!groupNameInput.equals(groupName))) { + if (!groupNameSet.isEmpty() && !groupNameSet.contains(groupName)) { continue; } String topicName = partitionIdArr[1]; @@ -209,22 +167,23 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req) + public StringBuilder adminQueryBrokerAllMessageStoreInfo(HttpServletRequest req) throws Exception { StringBuilder sBuilder = new StringBuilder(1024); + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + Set<String> topicNameSet = (Set<String>) result.retData1; sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"dataSet\":["); - String topicNameInput = - WebParameterUtils.validStringParameter("topicName", - req.getParameter("topicName"), - TBaseConstants.META_MAX_TOPICNAME_LENGTH, false, null); Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores = broker.getStoreManager().getMessageStores(); int index = 0; int recordId = 0; for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) { - if (TStringUtils.isBlank(entry.getKey()) || - (TStringUtils.isNotBlank(topicNameInput) - && !topicNameInput.equals(entry.getKey()))) { + if (TStringUtils.isBlank(entry.getKey()) + || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) { continue; } if (recordId > 0) { @@ -276,47 +235,27 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception { + public StringBuilder adminGetMemStoreStatisInfo(HttpServletRequest req) throws Exception { StringBuilder sBuilder = new StringBuilder(1024); - Set<String> batchTopicNames = new HashSet<>(); - String inputTopicName = req.getParameter("topicName"); - if (TStringUtils.isNotBlank(inputTopicName)) { - inputTopicName = inputTopicName.trim(); - String[] strTopicNames = - inputTopicName.split(TokenConstants.ARRAY_SEP); - for (int i = 0; i < strTopicNames.length; i++) { - if (TStringUtils.isBlank(strTopicNames[i])) { - continue; - } - String topicName = strTopicNames[i].trim(); - if (topicName.length() > TBaseConstants.META_MAX_TOPICNAME_LENGTH) { - sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") - .append("Invalid parameter: the max length of ") - .append(topicName).append(" in topicName parameter over ") - .append(TBaseConstants.META_MAX_TOPICNAME_LENGTH) - .append(" characters\"}"); - return sBuilder; - } - if (!topicName.matches(TBaseConstants.META_TMP_STRING_VALUE)) { - sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") - .append("Invalid parameter: the value of ").append(topicName) - .append(" in topicName parameter must begin with a letter,") - .append(" can only contain characters,numbers,and underscores!\"}"); - return sBuilder; - } - batchTopicNames.add(topicName); - } + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.COMPSTOPICNAME, false, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + Set<String> topicNameSet = (Set<String>) result.retData1; + result = WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.NEEDREFRESH, false, false); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); } - boolean requireRefresh = - WebParameterUtils.validBooleanDataParameter("needRefresh", - req.getParameter("needRefresh"), false, false); + boolean requireRefresh = (boolean) result.retData1; sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",\"detail\":["); Map<String, ConcurrentHashMap<Integer, MessageStore>> messageTopicStores = broker.getStoreManager().getMessageStores(); int recordId = 0, index = 0; for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : messageTopicStores.entrySet()) { if (TStringUtils.isBlank(entry.getKey()) - || (!batchTopicNames.isEmpty() && !batchTopicNames.contains(entry.getKey()))) { + || (!topicNameSet.isEmpty() && !topicNameSet.contains(entry.getKey()))) { continue; } String topicName = entry.getKey(); @@ -354,25 +293,38 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception { + public StringBuilder adminManualSetCurrentOffSet(HttpServletRequest req) throws Exception { StringBuilder sBuilder = new StringBuilder(512); - final String topicName = - WebParameterUtils.validStringParameter("topicName", - req.getParameter("topicName"), - TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, ""); - final String groupName = - WebParameterUtils.validGroupParameter("groupName", - req.getParameter("groupName"), - TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, ""); - final String modifyUser = - WebParameterUtils.validStringParameter("modifyUser", - req.getParameter("modifyUser"), 64, true, ""); - int partitionId = - WebParameterUtils.validIntDataParameter("partitionId", - req.getParameter("partitionId"), true, -1, 0); - long manualOffset = - WebParameterUtils.validLongDataParameter("manualOffset", - req.getParameter("manualOffset"), true, -1); + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String topicName = (String) result.retData1; + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.GROUPNAME, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String groupName = (String) result.retData1; + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String modifyUser = (String) result.retData1; + result = WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + int partitionId = (Integer) result.retData1; + result = WebParameterUtils.getLongParamValue(req, + WebFieldDef.MANUALOFFSET, true, -1); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + long manualOffset = (Long) result.retData1; List<String> topicList = broker.getMetadataManager().getTopics(); if (!topicList.contains(topicName)) { sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") @@ -430,28 +382,39 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception { + public StringBuilder adminQuerySnapshotMessageSet(HttpServletRequest req) throws Exception { StringBuilder sBuilder = new StringBuilder(1024); - final String topicName = - WebParameterUtils.validStringParameter("topicName", - req.getParameter("topicName"), - TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, ""); - final int partitionId = - WebParameterUtils.validIntDataParameter("partitionId", - req.getParameter("partitionId"), false, -1, 0); - int msgCount = - WebParameterUtils.validIntDataParameter("msgCount", - req.getParameter("msgCount"), false, 3, 3); - msgCount = msgCount < 1 ? 1 : msgCount; + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String topicName = (String) result.retData1; + result = WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + int partitionId = (Integer) result.retData1; + result = WebParameterUtils.getIntParamValue(req, + WebFieldDef.MSGCOUNT, false, 3, 3); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + int msgCount = (Integer) result.retData1; + msgCount = Math.max(msgCount, 1); if (msgCount > 50) { sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") .append("Over max allowed msgCount value, allowed count is 50!") .append("\"}"); return sBuilder; } - Set<String> filterCondStrSet = - WebParameterUtils.checkAndGetFilterCondSet(req.getParameter("filterConds"), - false, true, sBuilder); + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.FILTERCONDS, false, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + Set<String> filterCondStrSet = (Set<String>) result.retData1; sBuilder = broker.getBrokerServiceServer() .getMessageSnapshot(topicName, partitionId, msgCount, filterCondStrSet, sBuilder); return sBuilder; @@ -464,20 +427,34 @@ public class BrokerAdminServlet extends HttpServlet { * @return * @throws Exception */ - private StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req) + public StringBuilder adminQueryCurrentGroupOffSet(HttpServletRequest req) throws Exception { StringBuilder sBuilder = new StringBuilder(1024); - String topicName = - WebParameterUtils.validStringParameter("topicName", - req.getParameter("topicName"), - TBaseConstants.META_MAX_TOPICNAME_LENGTH, true, ""); - String groupName = - WebParameterUtils.validGroupParameter("groupName", - req.getParameter("groupName"), - TBaseConstants.META_MAX_GROUPNAME_LENGTH, true, ""); - int partitionId = - WebParameterUtils.validIntDataParameter("partitionId", - req.getParameter("partitionId"), true, -1, 0); + ProcessResult result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.TOPICNAME, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String topicName = (String) result.retData1; + result = WebParameterUtils.getStringParamValue(req, + WebFieldDef.GROUPNAME, true, null); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + final String groupName = (String) result.retData1; + result = WebParameterUtils.getIntParamValue(req, + WebFieldDef.PARTITIONID, true, -1, 0); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + int partitionId = (Integer) result.retData1; + + result = WebParameterUtils.getBooleanParamValue(req, + WebFieldDef.REQUIREREALOFFSET, false, false); + if (!result.success) { + return WebParameterUtils.buildFailResult(sBuilder, result.errInfo); + } + boolean requireRealOffset = (Boolean) result.retData1; List<String> topicList = broker.getMetadataManager().getTopics(); if (!topicList.contains(topicName)) { sBuilder.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") @@ -499,9 +476,6 @@ public class BrokerAdminServlet extends HttpServlet { .append("\"}"); return sBuilder; } - boolean requireRealOffset = - WebParameterUtils.validBooleanDataParameter("requireRealOffset", - req.getParameter("requireRealOffset"), false, false); long tmpOffset = offsetService.getTmpOffset(groupName, topicName, partitionId); long minDataOffset = store.getDataMinOffset(); long maxDataOffset = store.getDataMaxOffset(); @@ -538,5 +512,28 @@ public class BrokerAdminServlet extends HttpServlet { return sBuilder; } + public StringBuilder adminQueryConsumerRegisterInfo(HttpServletRequest req) { + StringBuilder sBuilder = new StringBuilder(1024); + Map<String, ConsumerNodeInfo> map = + broker.getBrokerServiceServer().getConsumerRegisterMap(); + int totalCnt = 0; + sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Success!\",") + .append(",\"dataSet\":["); + for (Entry<String, ConsumerNodeInfo> entry : map.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + if (totalCnt > 0) { + sBuilder.append(","); + } + sBuilder.append("{\"Partition\":\"").append(entry.getKey()) + .append("\",\"Consumer\":\"") + .append(entry.getValue().getConsumerId()) + .append("\",\"index\":").append(++totalCnt).append("}"); + } + sBuilder.append("],\"totalCnt\":").append(totalCnt).append("}"); + return sBuilder; + } + } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java new file mode 100644 index 0000000..b2d9327 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/CliArgDef.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.common.fielddef; + + + +public enum CliArgDef { + + // Note: Due to compatibility considerations, + // the defined fields in the scheme are forbidden to be modified, + // only new fields can be added + + HELP("h", "help", "Print usage information."), + VERSION("v", "version", "Display TubeMQ version."), + MASTERSERVER("master-servers", "master-servers", + "String: format is master1_ip:port[,master2_ip:port]", + "The master address(es) to connect to."), + MASTERURL("master-url", "master-url", + "String: format is http://master_ip:master_webport/", + "Master Service URL to which to connect.(default: http://localhost:8080/)"), + BROKERURL("broker-url", "broker-url", + "String: format is http://broker_ip:broker_webport/", + "Broker Service URL to which to connect.(default: http://localhost:8081/)"), + MESSAGES("messages", "messages", + "Long: count", + "The number of messages to send or consume, If not set, production or consumption is continual."), + MSGDATASIZE("msg-data-size", "message-data-size", + "Int: message size", + "message's data size in bytes. Note that you must provide exactly" + + " one of --msg-data-size or --payload-file."), + PAYLOADFILE("payload-file", "payload-file", + "String: payload file path", + "file to read the message payloads from. This works only for" + + " UTF-8 encoded text files. Payloads will be read from this" + + " file and a payload will be randomly selected when sending" + + " messages. Note that you must provide exactly one" + + " of --msg-data-size or --payload-file."), + PAYLOADDELIM("payload-delimiter", "payload-delimiter", + "String: payload data's delimiter", + "provides delimiter to be used when --payload-file is provided." + + " Defaults to new line. Note that this parameter will be" + + " ignored if --payload-file is not provided. (default: \\n)"), + PRDTOPIC("topic", "topicName", + "String: topic, format is topic_1[,topic_2[:filterCond_2.1[;filterCond_2.2]]]", + "The topic(s) to produce messages to."), + CNSTOPIC("topic", "topicName", + "String: topic, format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]", + "The topic(s) to consume on."), + RPCTIMEOUT("timeout", "timeout", + "Long: milliseconds", + "The maximum duration between request and response in milliseconds. (default: 10000)"), + GROUP("group", "groupName", + "String: consumer group", + "The consumer group name of the consumer."), + CLIENTCOUNT("client-num", "client-num", + "Int: client count", + "Number of consumers to started."), + PULLMODEL("pull-model", "pull-model", + "Pull consumption model."), + PUSHMODEL("push-model", "push-model", + "Push consumption model."), + FETCHTHREADS("num-fetch-threads", "num-fetch-threads", + "Integer: count", + "Number of fetch threads, default: num of cpu count."), + FROMLATEST("from-latest", "from-latest", + "Start to consume from the latest message present in the log."), + FROMBEGINNING("from-beginning", "from-beginning", + "If the consumer does not already have an established offset to consume from," + + " start with the earliest message present in the log rather than the latest message."), + OUTPUTINTERVAL("output-interval", "output-interval", + "Integer: interval_ms", + "Interval in milliseconds at which to print progress info. (default: 5000)"); + + + CliArgDef(String opt, String longOpt, String optDesc) { + this(opt, longOpt, false, "", optDesc); + } + + CliArgDef(String opt, String longOpt, String argDesc, String optDesc) { + this(opt, longOpt, true, argDesc, optDesc); + } + + CliArgDef(String opt, String longOpt, boolean hasArg, String argDesc, String optDesc) { + this.opt = opt; + this.longOpt = longOpt; + this.hasArg = hasArg; + this.argDesc = argDesc; + this.optDesc = optDesc; + } + + public final String opt; + public final String longOpt; + public final boolean hasArg; + public final String argDesc; + public final String optDesc; +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java new file mode 100644 index 0000000..1025ba0 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.common.fielddef; + +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.TokenConstants; +import org.apache.tubemq.corebase.utils.RegexDef; +import org.apache.tubemq.server.common.TServerConstants; +import org.apache.tubemq.server.common.webbase.WebFieldType; + + +public enum WebFieldDef { + + // Note: Due to compatibility considerations, + // the defined fields in the scheme are forbidden to be modified, + // only new fields can be added + + TOPICNAME(0, "topicName", "topic", WebFieldType.STRING, + "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH, + RegexDef.TMP_STRING), + GROUPNAME(1, "groupName", "group", WebFieldType.STRING, + "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH, + RegexDef.TMP_GROUP), + PARTITIONID(2, "partitionId", "pid", WebFieldType.INT, + "Partition id", RegexDef.TMP_NUMBER), + CREATEUSER(3, "createUser", "cur", WebFieldType.STRING, + "Record creator", TBaseConstants.META_MAX_USERNAME_LENGTH, + RegexDef.TMP_STRING), + MODIFYUSER(4, "modifyUser", "mur", WebFieldType.STRING, + "Record modifier", TBaseConstants.META_MAX_USERNAME_LENGTH, + RegexDef.TMP_STRING), + MANUALOFFSET(5, "manualOffset", "offset", WebFieldType.LONG, + "Reset offset value", RegexDef.TMP_NUMBER), + MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT, + "Number of returned messages", RegexDef.TMP_NUMBER), + FILTERCONDS(7, "filterConds", "flts", WebFieldType.COMPSTRING, + "Filter condition items", TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH, + TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT, RegexDef.TMP_FILTER), + REQUIREREALOFFSET(8, "requireRealOffset", "dko", WebFieldType.BOOLEAN, + "Require return disk offset details"), + NEEDREFRESH(9, "needRefresh", "nrf", WebFieldType.BOOLEAN, + "Require refresh data"), + COMPSGROUPNAME(10, "groupName", "group", WebFieldType.COMPSTRING, + "Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH, + RegexDef.TMP_GROUP), + COMPSTOPICNAME(11, "topicName", "topic", WebFieldType.COMPSTRING, + "Topic name", TBaseConstants.META_MAX_TOPICNAME_LENGTH, + RegexDef.TMP_STRING), + COMPSPARTITIONID(12, "partitionId", "pid", WebFieldType.COMPINT, + "Partition id", RegexDef.TMP_NUMBER); + + + + + public final int id; + public final String name; + public final String shortName; + public final WebFieldType type; + public final String desc; + public final boolean compVal; + public final String splitToken; + public final int itemMaxCnt; + public final int valMaxLen; + public final boolean regexCheck; + public final RegexDef regexDef; + + + WebFieldDef(int id, String name, String shortName, WebFieldType type, String desc) { + this(id, name, shortName, type, desc, TBaseConstants.META_VALUE_UNDEFINED, + TBaseConstants.META_VALUE_UNDEFINED, false, null); + } + + WebFieldDef(int id, String name, String shortName, WebFieldType type, + String desc, RegexDef regexDef) { + this(id, name, shortName, type, desc, + TBaseConstants.META_VALUE_UNDEFINED, regexDef); + } + + WebFieldDef(int id, String name, String shortName, WebFieldType type, + String desc, int valMaxLen, RegexDef regexDef) { + this(id, name, shortName, type, desc, valMaxLen, + TServerConstants.CFG_BATCH_RECORD_OPERATE_MAX_COUNT, + true, regexDef); + } + + WebFieldDef(int id, String name, String shortName, WebFieldType type, + String desc, int valMaxLen, int itemMaxCnt, RegexDef regexDef) { + this(id, name, shortName, type, desc, valMaxLen, + itemMaxCnt, true, regexDef); + } + + WebFieldDef(int id, String name, String shortName, WebFieldType type, + String desc, int valMaxLen, int itemMaxCnt, + boolean regexChk, RegexDef regexDef) { + this.id = id; + this.name = name; + this.shortName = shortName; + this.type = type; + this.desc = desc; + if (isCompFieldType()) { + this.compVal = true; + this.splitToken = TokenConstants.ARRAY_SEP; + this.itemMaxCnt = itemMaxCnt; + } else { + this.compVal = false; + this.splitToken = ""; + this.itemMaxCnt = TBaseConstants.META_VALUE_UNDEFINED; + } + this.valMaxLen = valMaxLen; + this.regexCheck = regexChk; + this.regexDef = regexDef; + } + + public boolean isCompFieldType() { + return (this.type == WebFieldType.COMPINT + || this.type == WebFieldType.COMPSTRING); + } + + private static final WebFieldDef[] WEB_FIELD_DEFS; + private static final int MIN_FIELD_ID = 0; + public static final int MAX_FIELD_ID; + + static { + int maxId = -1; + for (WebFieldDef fieldDef : WebFieldDef.values()) { + maxId = Math.max(maxId, fieldDef.id); + } + WebFieldDef[] idToType = new WebFieldDef[maxId + 1]; + for (WebFieldDef fieldDef : WebFieldDef.values()) { + idToType[fieldDef.id] = fieldDef; + } + WEB_FIELD_DEFS = idToType; + MAX_FIELD_ID = maxId; + } + + public static WebFieldDef valueOf(int fieldId) { + if (fieldId >= MIN_FIELD_ID && fieldId <= MAX_FIELD_ID) { + return WEB_FIELD_DEFS[fieldId]; + } + throw new IllegalArgumentException( + String.format("Unexpected WebFieldDef id `%s`, it should be between `%s` " + + "and `%s` (inclusive)", fieldId, MIN_FIELD_ID, MAX_FIELD_ID)); + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java new file mode 100644 index 0000000..3688b1c --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/ProcessResult.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.common.utils; + +import org.apache.tubemq.corebase.TErrCodeConstants; + +public class ProcessResult { + public boolean success = true; + public int errCode = TErrCodeConstants.SUCCESS; + public String errInfo = ""; + public Object retData1 = null; + + public ProcessResult() { + + } + + public ProcessResult(Object retData) { + this.success = true; + this.retData1 = retData; + } + + public ProcessResult(int errCode, String errInfo) { + this.success = false; + this.errCode = errCode; + this.errInfo = errInfo; + } + + public void setFailResult(int errCode, final String errMsg) { + this.success = false; + this.errCode = errCode; + this.errInfo = errMsg; + } + + public void setSuccResult(Object retData) { + this.success = true; + this.retData1 = retData; + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index f4ce5bf..b7d5d41 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -30,18 +30,21 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.TStatusConstants; +import org.apache.tubemq.server.common.fielddef.WebFieldDef; import org.apache.tubemq.server.master.TMaster; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo; + public class WebParameterUtils { private static final List<String> allowedDelUnits = Arrays.asList("s", "m", "h"); @@ -153,6 +156,24 @@ public class WebParameterUtils { /** * Parse the parameter value from an object value to string value * + * @param req http servlet request + * @param paramName the parameter name + * @param paramMaxLen the max length of string to return + * @param required a boolean value represent whether the parameter is must required + * @param defaultValue a default value returned if failed to parse value from the given object + * @return a string value of parameter + * @throws Exception if failed to parse the object + */ + public static String validStringParameter(HttpServletRequest req, String paramName, + int paramMaxLen, boolean required, + String defaultValue) throws Exception { + return validStringParameter(paramName, + req.getParameter(paramName), paramMaxLen, required, defaultValue); + } + + /** + * Parse the parameter value from an object value to string value + * * @param paramName the parameter name * @param paramValue the parameter value which is an object for parsing * @param paramMaxLen the max length of string to return @@ -161,9 +182,11 @@ public class WebParameterUtils { * @return a string value of parameter * @throws Exception if failed to parse the object */ - public static String validStringParameter(String paramName, String paramValue, int paramMaxLen, - boolean required, String defaultValue) throws Exception { - String tmpParamValue = checkParamCommonRequires(paramName, paramValue, required); + public static String validStringParameter(String paramName, String paramValue, + int paramMaxLen, boolean required, + String defaultValue) throws Exception { + String tmpParamValue = + checkParamCommonRequires(paramName, paramValue, required); if (TStringUtils.isBlank(tmpParamValue)) { return defaultValue; } @@ -214,6 +237,297 @@ public class WebParameterUtils { return tmpParamValue; } + public static StringBuilder buildFailResult(StringBuilder strBuffer, String errMsg) { + return strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"") + .append(errMsg).append("\"}"); + } + + /** + * Parse the parameter value from an object value to a long value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if failed to parse value from the given object + * @return valid result for the parameter value + */ + public static ProcessResult getLongParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + long defValue) { + ProcessResult procResult = + getStringParamValue(req, fieldDef, required, null); + if (!procResult.success) { + return procResult; + } + String paramValue = (String) procResult.retData1; + if (paramValue == null) { + procResult.setSuccResult(defValue); + return procResult; + } + try { + long paramIntVal = Long.parseLong(paramValue); + procResult.setSuccResult(paramIntVal); + } catch (Throwable e) { + procResult.setFailResult(400, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name).append(" parse error: ") + .append(e.getMessage()).toString()); + } + return procResult; + } + + /** + * Parse the parameter value from an object value to a integer value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if failed to parse value from the given object + * @param minValue min value required + * @return valid result for the parameter value + */ + public static ProcessResult getIntParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + int defValue, + int minValue) { + ProcessResult procResult = + getStringParamValue(req, fieldDef, required, null); + if (!procResult.success) { + return procResult; + } + if (fieldDef.isCompFieldType()) { + Set<Integer> tgtValueSet = new HashSet<Integer>(); + Set<String> valItemSet = (Set<String>) procResult.retData1; + if (valItemSet.isEmpty()) { + tgtValueSet.add(defValue); + procResult.setSuccResult(tgtValueSet); + return procResult; + } + ProcessResult procRet = new ProcessResult(); + for (String itemVal : valItemSet) { + if (!checkIntValueNorms(procRet, fieldDef, itemVal, minValue)) { + return procRet; + } + tgtValueSet.add((Integer) procRet.retData1); + } + procResult.setSuccResult(tgtValueSet); + } else { + String paramValue = (String) procResult.retData1; + if (paramValue == null) { + procResult.setSuccResult(defValue); + return procResult; + } + checkIntValueNorms(procResult, fieldDef, paramValue, minValue); + } + return procResult; + } + + /** + * Parse the parameter value from an object value to a boolean value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if failed to parse value from the given object + * @return valid result for the parameter value + */ + public static ProcessResult getBooleanParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + boolean defValue) { + ProcessResult procResult = + getStringParamValue(req, fieldDef, required, null); + if (!procResult.success) { + return procResult; + } + String paramValue = (String) procResult.retData1; + if (paramValue == null) { + procResult.setSuccResult(defValue); + return procResult; + } + procResult.setSuccResult(Boolean.parseBoolean(paramValue)); + return procResult; + } + + /** + * Parse the parameter value from an object value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if failed to parse value from the given object + * @return valid result for the parameter value + */ + public static ProcessResult getStringParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + String defValue) { + ProcessResult procResult = new ProcessResult(); + // get parameter value + String paramValue = req.getParameter(fieldDef.name); + if (paramValue == null) { + paramValue = req.getParameter(fieldDef.shortName); + } + if (TStringUtils.isNotBlank(paramValue)) { + // Cleanup value extra characters + paramValue = escDoubleQuotes(paramValue.trim()); + } + // Check if the parameter exists + if (TStringUtils.isBlank(paramValue)) { + if (required) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append(" is missing or value is null or blank!").toString()); + } else { + procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue); + } + return procResult; + } + // check if value is norm; + if (fieldDef.isCompFieldType()) { + // split original value to items + Set<String> valItemSet = new HashSet<>(); + String[] strParamValueItems = paramValue.split(fieldDef.splitToken); + for (String strParamValueItem : strParamValueItems) { + if (TStringUtils.isBlank(strParamValueItem)) { + continue; + } + if (!checkStrValueNorms(procResult, fieldDef, strParamValueItem)) { + return procResult; + } + valItemSet.add((String) procResult.retData1); + } + // check if is empty result + if (valItemSet.isEmpty()) { + if (required) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append(" is missing or value is null or blank!").toString()); + } else { + procStringDefValue(procResult, fieldDef.isCompFieldType(), defValue); + } + return procResult; + } + // check max item count + if (fieldDef.itemMaxCnt != TBaseConstants.META_VALUE_UNDEFINED) { + if (valItemSet.size() > fieldDef.itemMaxCnt) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name) + .append("'s item count over max allowed count (") + .append(fieldDef.itemMaxCnt).append(")!").toString()); + } + } + procResult.setSuccResult(valItemSet); + } else { + if (!checkStrValueNorms(procResult, fieldDef, paramValue)) { + return procResult; + } + procResult.setSuccResult(paramValue); + } + return procResult; + } + + /** + * process string default value + * + * @param procResult process result + * @param isCompFieldType the parameter if compound field type + * @param defValue the parameter default value + * @return process result for default value of parameter + */ + private static ProcessResult procStringDefValue(ProcessResult procResult, + boolean isCompFieldType, + String defValue) { + if (isCompFieldType) { + Set<String> valItemSet = new HashSet<>(); + if (TStringUtils.isNotBlank(defValue)) { + valItemSet.add(defValue); + } + procResult.setSuccResult(valItemSet); + } else { + procResult.setSuccResult(defValue); + } + return procResult; + } + + /** + * Parse the parameter string value by regex define + * + * @param procResult process result + * @param fieldDef the parameter field definition + * @param paramVal the parameter value + * @return check result for string value of parameter + */ + private static boolean checkStrValueNorms(ProcessResult procResult, + WebFieldDef fieldDef, + String paramVal) { + paramVal = paramVal.trim(); + if (TStringUtils.isBlank(paramVal)) { + procResult.setSuccResult(null); + return true; + } + // check value's max length + if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) { + if (paramVal.length() > fieldDef.valMaxLen) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("over max length for ") + .append(fieldDef.name).append(", only allow ") + .append(fieldDef.valMaxLen).append(" length").toString()); + return false; + } + } + // check value's pattern + if (fieldDef.regexCheck) { + if (!paramVal.matches(fieldDef.regexDef.getPattern())) { + procResult.setFailResult(fieldDef.id, + new StringBuilder(512).append("illegal value for ") + .append(fieldDef.name).append(", value ") + .append(fieldDef.regexDef.getErrMsgTemp()).toString()); + return false; + } + } + procResult.setSuccResult(paramVal); + return true; + } + + /** + * Parse the parameter string value by regex define + * + * @param procResult process result + * @param fieldDef the parameter field definition + * @param paramValue the parameter value + * param minValue the parameter min value + * @return check result for string value of parameter + */ + private static boolean checkIntValueNorms(ProcessResult procResult, + WebFieldDef fieldDef, + String paramValue, + int minValue) { + try { + int paramIntVal = Integer.parseInt(paramValue); + if (paramIntVal < minValue) { + procResult.setFailResult(400, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name).append(" value must >= ") + .append(minValue).toString()); + return false; + } + procResult.setSuccResult(paramIntVal); + } catch (Throwable e) { + procResult.setFailResult(400, + new StringBuilder(512).append("Parameter ") + .append(fieldDef.name).append(" parse error: ") + .append(e.getMessage()).toString()); + return false; + } + return true; + } + /** * Parse the parameter value from an object value to ip address of string value * diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java new file mode 100644 index 0000000..b83a966 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.common.webbase; + + + +public enum WebFieldType { + + UNKNOWN(-1, "Unknown field type"), + STRING(1, "String"), + INT(2, "int"), + LONG(3, "long"), + BOOLEAN(4, "Boolean"), + DATE(5, "Date"), + COMPSTRING(6, "Compound string"), + COMPINT(7, "Compound integer"); + + + public int value; + public String desc; + + WebFieldType(int value, String desc) { + this.value = value; + this.desc = desc; + } + + public static WebFieldType valueOf(int value) { + for (WebFieldType fieldType : WebFieldType.values()) { + if (fieldType.getValue() == value) { + return fieldType; + } + } + + return UNKNOWN; + } + + public int getValue() { + return value; + } + + public String getDesc(){ + return desc; + } + +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java similarity index 87% rename from tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java rename to tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java index a71ba03..a856014 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebApiMapper.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebMethodMapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.tubemq.server.master.web.handler; +package org.apache.tubemq.server.common.webbase; import java.lang.reflect.Method; import java.util.HashMap; @@ -24,10 +24,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WebApiMapper { +public class WebMethodMapper { // log printer private static final Logger logger = - LoggerFactory.getLogger(WebApiMapper.class); + LoggerFactory.getLogger(WebMethodMapper.class); // The query methods map public static final Map<String, WebApiRegInfo> WEB_QRY_METHOD_MAP = new HashMap<>(); @@ -47,7 +47,7 @@ public class WebApiMapper { public static void registerWebMethod(boolean isQryApi, String webMethodName, String clsMethodName, - AbstractWebHandler webHandler) { + Object webHandler) { Method[] methods = webHandler.getClass().getMethods(); for (Method item : methods) { if (item.getName().equals(clsMethodName)) { @@ -71,10 +71,10 @@ public class WebApiMapper { public static class WebApiRegInfo { public Method method; - public AbstractWebHandler webHandler; + public Object webHandler; public WebApiRegInfo(Method method, - AbstractWebHandler webHandler) { + Object webHandler) { this.method = method; this.webHandler = webHandler; } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java index 326f690..5d4de04 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/action/screen/Webapi.java @@ -17,7 +17,7 @@ package org.apache.tubemq.server.master.web.action.screen; -import static org.apache.tubemq.server.master.web.handler.WebApiMapper.getWebApiRegInfo; +import static org.apache.tubemq.server.common.webbase.WebMethodMapper.getWebApiRegInfo; import java.util.Arrays; import java.util.List; @@ -25,13 +25,13 @@ import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corerpc.exception.StandbyException; +import org.apache.tubemq.server.common.webbase.WebMethodMapper; import org.apache.tubemq.server.master.TMaster; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager; import org.apache.tubemq.server.master.web.handler.AbstractWebHandler; import org.apache.tubemq.server.master.web.handler.WebAdminFlowRuleHandler; import org.apache.tubemq.server.master.web.handler.WebAdminGroupCtrlHandler; import org.apache.tubemq.server.master.web.handler.WebAdminTopicAuthHandler; -import org.apache.tubemq.server.master.web.handler.WebApiMapper; import org.apache.tubemq.server.master.web.handler.WebBrokerDefConfHandler; import org.apache.tubemq.server.master.web.handler.WebBrokerTopicConfHandler; import org.apache.tubemq.server.master.web.handler.WebMasterInfoHandler; @@ -102,7 +102,7 @@ public class Webapi implements Action { "DesignatedPrimary happened...please check if the other member is down"); } } - WebApiMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method); + WebMethodMapper.WebApiRegInfo webApiRegInfo = getWebApiRegInfo(isQuery, method); if (webApiRegInfo == null) { strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"Unsupported method: ") .append(method).append("}"); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java index 09b11eb..1b1bfdc 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java @@ -17,7 +17,7 @@ package org.apache.tubemq.server.master.web.handler; -import static org.apache.tubemq.server.master.web.handler.WebApiMapper.registerWebMethod; +import static org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod; import org.apache.tubemq.server.master.TMaster; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
