This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new a278069 [TUBEMQ-584] Adjust WebMasterInfoHandler class implementation
a278069 is described below
commit a278069c26c975767f8d90112f7c861cf9785088
Author: gosonzhang <[email protected]>
AuthorDate: Tue Mar 30 20:26:41 2021 +0800
[TUBEMQ-584] Adjust WebMasterInfoHandler class implementation
---
.../org/apache/tubemq/corebase/TBaseConstants.java | 1 +
.../tubemq/server/common/TServerConstants.java | 31 +
.../tubemq/server/common/fielddef/WebFieldDef.java | 77 +-
.../server/common/statusdef/EnableStatus.java | 8 +-
.../server/common/statusdef/TopicStatus.java | 2 +-
.../server/common/utils/WebParameterUtils.java | 840 ++++++++++++++++++++-
.../org/apache/tubemq/server/master/TMaster.java | 12 +
.../bdbentitys/BdbTopicAuthControlEntity.java | 18 +
.../server/master/metamanage/DataOpErrCode.java | 1 +
.../server/master/metamanage/MetaDataManager.java | 40 +-
.../metastore/dao/entity/BaseEntity.java | 57 +-
.../metastore/dao/entity/BrokerConfEntity.java | 62 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 66 +-
.../metastore/dao/entity/GroupBaseCtrlEntity.java | 46 +-
.../metastore/dao/entity/GroupBlackListEntity.java | 37 +-
.../dao/entity/GroupConsumeCtrlEntity.java | 45 +-
.../metastore/dao/entity/TopicConfEntity.java | 74 +-
.../metastore/dao/entity/TopicCtrlEntity.java | 77 +-
.../metastore/dao/entity/TopicPropGroup.java | 82 +-
.../master/web/handler/AbstractWebHandler.java | 3 +
.../master/web/handler/WebMasterInfoHandler.java | 196 ++++-
21 files changed, 1636 insertions(+), 139 deletions(-)
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
index 35aaffd..f6ab81a 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
@@ -27,6 +27,7 @@ public class TBaseConstants {
public static final int META_DEFAULT_MASTER_TLS_PORT = 8716;
public static final int META_DEFAULT_BROKER_PORT = 8123;
public static final int META_DEFAULT_BROKER_TLS_PORT = 8124;
+ public static final int META_DEFAULT_BROKER_WEB_PORT = 8081;
public static final int META_STORE_INS_BASE = 10000;
public static final String META_DEFAULT_CHARSET_NAME = "UTF-8";
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 09b115f..411c797 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -28,8 +28,39 @@ public final class TServerConstants {
public static final long DEFAULT_DATA_VERSION = 0L;
+
+ public static final String BLANK_FLOWCTRL_RULES = "[]";
public static final String BLANK_FILTER_ITEM_STR = ",,";
+ public static final int QRY_PRIORITY_DEF_VALUE = 301;
+ public static final int QRY_PRIORITY_MIN_VALUE = 101;
+ public static final int QRY_PRIORITY_MAX_VALUE = 303;
+
+ public static final int TOPIC_STOREBLOCK_NUM_MIN = 1;
+
+ public static final int TOPIC_PARTITION_NUM_MIN = 1;
+
+ public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_MIN = 0;
+ public static final int TOPIC_DSK_UNFLUSHTHRESHOLD_DEF = 1000;
+
+ public static final int TOPIC_DSK_UNFLUSHINTERVAL_MIN = 1;
+ public static final int TOPIC_DSK_UNFLUSHINTERVAL_DEF = 10000;
+
+ public static final int TOPIC_DSK_UNFLUSHDATAHOLD_MIN = 0;
+
+ public static final int TOPIC_CACHESIZE_MB_MIN = 2;
+ public static final int TOPIC_CACHESIZE_MB_DEF = 3;
+ public static final int TOPIC_CACHESIZE_MB_MAX = 2048;
+
+ public static final int TOPIC_CACHEINTVL_MIN = 4000;
+ public static final int TOPIC_CACHEINTVL_DEF = 20000;
+
+ public static final int TOPIC_CACHECNT_INK_MIN = 1;
+ public static final int TOPIC_CACHECNT_INK_DEF = 10;
+
+ public static final String TOPIC_POLICY_DEF = "delete,168h";
+
+
public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128;
public static final int CFG_ROWLOCK_DEFAULT_DURATION = 30000;
public static final int CFG_ZK_COMMIT_DEFAULT_RETRIES = 10;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index bfae7c2..5cb2d5e 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -48,7 +48,7 @@ public enum WebFieldDef {
"Reset offset value", RegexDef.TMP_NUMBER),
MSGCOUNT(6, "msgCount", "cnt", WebFieldType.INT,
"Number of returned messages", RegexDef.TMP_NUMBER),
- FILTERCONDS(7, "filterConds", "flts", WebFieldType.COMPSTRING,
+ FILTERCONDS(7, "filterConds", "fltRls", WebFieldType.COMPSTRING,
"Filter condition items",
TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH,
TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT, RegexDef.TMP_FILTER),
REQUIREREALOFFSET(8, "requireRealOffset", "dko", WebFieldType.BOOLEAN,
@@ -66,10 +66,10 @@ public enum WebFieldDef {
"Partition id", RegexDef.TMP_NUMBER),
CALLERIP(13, "callerIp", "cip", WebFieldType.STRING,
"Caller ip address",
TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH),
- BROKERID(14, "brokerId", "brokerId", WebFieldType.INT,
+ BROKERID(14, "brokerId", "bId", WebFieldType.INT,
"Broker ID", RegexDef.TMP_NUMBER),
- COMPSBROKERID(15, "brokerId", "brokerId", WebFieldType.COMPINT,
+ COMPSBROKERID(15, "brokerId", "bId", WebFieldType.COMPINT,
"Broker ID", RegexDef.TMP_NUMBER),
WITHIP(16, "withIp", "ip", WebFieldType.BOOLEAN,
"Require return ip information, default is false"),
@@ -91,8 +91,9 @@ public enum WebFieldDef {
ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING,
"Admin api operation authorization code",
TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH),
- MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
- "Max allowed message size, unit MB", RegexDef.TMP_NUMBER),
+ BROKERWEBPORT(24, "brokerWebPort", "bWebPort", WebFieldType.INT,
+ "Broker web port", RegexDef.TMP_NUMBER),
+
CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
"Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
@@ -111,11 +112,11 @@ public enum WebFieldDef {
COMPSCONSUMEGROUP(30, "consumeGroup", "group", WebFieldType.COMPSTRING,
"Group name", TBaseConstants.META_MAX_GROUPNAME_LENGTH,
RegexDef.TMP_GROUP),
- REGIONID(31, "regionId", "regionId", WebFieldType.INT,
+ REGIONID(31, "regionId", "rId", WebFieldType.INT,
"Region id", RegexDef.TMP_NUMBER),
- COMPREGIONID(32, "regionId", "regionId", WebFieldType.COMPINT,
+ COMPREGIONID(32, "regionId", "rId", WebFieldType.COMPINT,
"Region id", RegexDef.TMP_NUMBER),
- DATAVERSIONID(33, "dataVersionId", "dataVerId", WebFieldType.LONG,
+ DATAVERSIONID(33, "dataVersionId", "dVerId", WebFieldType.LONG,
"Data version id", RegexDef.TMP_NUMBER),
TOPICNAMEID(34, "topicNameId", "topicId", WebFieldType.LONG,
"Topic name id", RegexDef.TMP_NUMBER),
@@ -131,50 +132,78 @@ public enum WebFieldDef {
UNFLUSHINTERVAL(39, "unflushInterval", "unfDskInt", WebFieldType.INT,
"Maximum allowed disk unflushing interval", RegexDef.TMP_NUMBER),
- UNFLUSHDATAHOLD(40, "unflushDataHold", "unfDskDataSize", WebFieldType.INT,
+ UNFLUSHDATAHOLD(40, "unflushDataHold", "unfDskDataSz", WebFieldType.INT,
"Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
- MCACHESIZEINMB(41, "memCacheMsgSizeInMB", "cacheSizeInMB",
WebFieldType.INT,
+ MCACHESIZEINMB(41, "memCacheMsgSizeInMB", "cacheInMB", WebFieldType.INT,
"Maximum allowed memory cache size in MB", RegexDef.TMP_NUMBER),
UNFMCACHECNTINK(42, "memCacheMsgCntInK", "unfMemMsgCnt", WebFieldType.INT,
"Maximum allowed memory cache unflushing message count",
RegexDef.TMP_NUMBER),
UNFMCACHEINTERVAL(43, "memCacheFlushIntvl", "unfMemInt", WebFieldType.INT,
"Maximum allowed disk unflushing data size", RegexDef.TMP_NUMBER),
- MAXMSGSIZEINMB(44, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
- "Maximum allowed message length", RegexDef.TMP_NUMBER),
+ MAXMSGSIZEINMB(44, "maxMsgSizeInMB", "mxMsgInMB", WebFieldType.INT,
+ "Maximum allowed message length, unit MB", RegexDef.TMP_NUMBER),
ACCEPTPUBLISH(45, "acceptPublish", "accPub", WebFieldType.BOOLEAN,
"Enable publishing"),
ACCEPTSUBSCRIBE(46, "acceptSubscribe", "accSub", WebFieldType.BOOLEAN,
"Enable subscription"),
- DELETEPOLICY(47, "deletePolicy", "delPolicy",
- WebFieldType.DELPOLICY, "File aging strategy"),
+ DELETEPOLICY(47, "deletePolicy", "delPol", WebFieldType.DELPOLICY,
+ "File aging strategy",
TServerConstants.CFG_DELETEPOLICY_MAX_LENGTH),
TOPICJSONSET(48, "topicJsonSet", "topicSet",
WebFieldType.JSONSET, "The topic info set that needs to be added
or modified"),
- BROKERIP(49, "brokerIp", "brokerIp", WebFieldType.STRING,
+ BROKERIP(49, "brokerIp", "bIp", WebFieldType.STRING,
"Broker ip", TBaseConstants.META_MAX_BROKER_IP_LENGTH,
RegexDef.TMP_IPV4ADDRESS),
- BROKERPORT(50, "brokerPort", "brokerPort", WebFieldType.INT,
+ BROKERPORT(50, "brokerPort", "bPort", WebFieldType.INT,
"Broker port", RegexDef.TMP_NUMBER),
- BROKERTLSPORT(51, "brokerTLSPort", "brokerTLSPort", WebFieldType.INT,
+ BROKERTLSPORT(51, "brokerTLSPort", "bTlsPort", WebFieldType.INT,
"Broker tls port", RegexDef.TMP_NUMBER),
BROKERJSONSET(52, "brokerJsonSet", "brokerSet",
WebFieldType.JSONSET, "The broker info set that needs to be added
or modified"),
- STATUSID(53, "statusId", "statusId", WebFieldType.INT,
+ STATUSID(53, "statusId", "stsId", WebFieldType.INT,
"Status id", RegexDef.TMP_NUMBER),
QRYPRIORITYID(54, "qryPriorityId", "qryPriId", WebFieldType.INT,
"Query priority id", RegexDef.TMP_NUMBER),
- FLOWCTRLSET(55, "flowCtrlInfo", "flowCtrlSet",
+ FLOWCTRLSET(55, "flowCtrlInfo", "fCtrlInfo",
WebFieldType.JSONSET,
"The flow control info set that needs to be added or modified"),
- CONDSTATUS(56, "condStatus", "condStatus", WebFieldType.INT,
+ CONDSTATUS(56, "condStatus", "condSts", WebFieldType.INT,
"Group control rule status id", RegexDef.TMP_NUMBER),
FILTERJSONSET(57, "filterCondJsonSet", "filterJsonSet",
- WebFieldType.JSONSET, "The batch filter condition configure json
array");
-
-
-
+ WebFieldType.JSONSET, "The batch filter condition configure json
array"),
+ DATASTORETYPE(58, "dataStoreType", "dStType", WebFieldType.INT,
+ "Data store type", RegexDef.TMP_NUMBER),
+ DATAPATH(59, "dataPath", "dPath",
+ WebFieldType.STRING, "Data path"),
+
+ ATTRIBUTES(60, "attributes", "attrs",
+ WebFieldType.STRING, "Attributes"),
+ RECORDKEY(61, "recordKey", "recKey",
+ WebFieldType.STRING, "Record key"),
+ FLOWCTRLENABLE(62, "flowCtrlEnable", "fCtrlEn",
+ WebFieldType.BOOLEAN, "Flow control enable status"),
+ FLOWCTRLRULECOUNT(63, "flowCtrlRuleCount", "fCtrlCnt", WebFieldType.INT,
+ "The count of flow control info set", RegexDef.TMP_NUMBER),
+ RESCHECKENABLE(64, "resCheckEnable", "resChkEn",
+ WebFieldType.BOOLEAN, "Resource check enable status"),
+
+ ALWDBCRATE(65, "alwdBrokerClientRate", "abcr", WebFieldType.INT,
+ "Allowed broker client rate", RegexDef.TMP_NUMBER),
+ REASON(66, "reason", "rsn", WebFieldType.STRING,
+ "Reason", TBaseConstants.META_MAX_OPREASON_LENGTH,
RegexDef.TMP_STRING),
+ FILTERENABLE(67, "filterEnable", "fltEn",
+ WebFieldType.BOOLEAN, "Filter consume enable status"),
+ MANAGESTATUS(68, "manageStatus", "mSts",
+ WebFieldType.STRING, "Broker manage status"),
+ GROUPID(69, "groupId", "gId",
+ WebFieldType.INT, "Group id", RegexDef.TMP_NUMBER),
+
+ TOPICSTATUSID(70, "topicStatusId", "tStsId", WebFieldType.INT,
+ "Status id", RegexDef.TMP_NUMBER),
+ AUTHCTRLENABLE(71, "enableAuthControl", "acEn",
+ WebFieldType.BOOLEAN, "Group authenticate control enable
status");
public final int id;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
index c7b86a1..b158572 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
@@ -20,8 +20,8 @@ package org.apache.tubemq.server.common.statusdef;
public enum EnableStatus {
STATUS_UNDEFINE(-2, "Undefined."),
- STATUS_ENABLE(1, "Enable."),
- STATUS_DISABLE(0, "Disable.");
+ STATUS_DISABLE(0, "Disable."),
+ STATUS_ENABLE(1, "Enable.");
private int code;
private String description;
@@ -36,6 +36,10 @@ public enum EnableStatus {
return code;
}
+ public boolean isEnable() {
+ return this == EnableStatus.STATUS_ENABLE;
+ }
+
public String getDescription() {
return description;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
index 13384a4..6ebea57 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/TopicStatus.java
@@ -18,7 +18,7 @@
package org.apache.tubemq.server.common.statusdef;
public enum TopicStatus {
- STATUS_TOPIC_UNDEFINED(-2, "Undefined."),
+ STATUS_TOPIC_UNDEFINED(-2, "Undefined"),
STATUS_TOPIC_OK(0, "Normal"),
STATUS_TOPIC_SOFT_DELETE(1, "Soft deleted"),
STATUS_TOPIC_SOFT_REMOVE(2, "Soft removed"),
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index e48583e..f9ad424 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -23,6 +23,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
@@ -33,22 +34,449 @@ import java.util.TreeSet;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.policies.FlowCtrlItem;
+import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import org.apache.tubemq.server.master.metamanage.DataOpErrCode;
+import
org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
-
public class WebParameterUtils {
private static final List<String> allowedDelUnits = Arrays.asList("s",
"m", "h");
+ private static final List<Integer> allowedPriorityVal = Arrays.asList(1,
2, 3);
+
+
+ public static StringBuilder buildFailResult(StringBuilder strBuffer,
String errMsg) {
+ return
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
+ .append(errMsg).append("\"}");
+ }
+
+ public static StringBuilder buildFailResultWithBlankData(String errMsg,
+ StringBuilder
strBuffer) {
+ return buildFailResultWithBlankData(400, errMsg, strBuffer);
+ }
+
+ public static StringBuilder buildFailResultWithBlankData(int errcode,
String errMsg,
+ StringBuilder
strBuffer) {
+ return
strBuffer.append("{\"result\":false,\"errCode\":").append(errcode)
+
.append(",\"errMsg\":\"").append(errMsg).append("\",\"data\":[]}");
+ }
+
+ public static StringBuilder buildSuccessResult(StringBuilder strBuffer) {
+ return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+ }
+
+ public static StringBuilder buildSuccessResult(StringBuilder strBuffer,
String appendInfo) {
+ return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
+ append(appendInfo).append("\"}");
+ }
+
+ public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder
strBuffer) {
+ return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
+ }
+
+ public static StringBuilder buildSuccessWithDataRetEnd(
+ StringBuilder strBuffer, int totalCnt) {
+ return strBuffer.append("],\"count\":").append(totalCnt).append("}");
+ }
+
+ public static StringBuilder buildSuccWithData(long dataVerId,
+ StringBuilder strBuffer) {
+ List<Long> dataVerIds = new ArrayList<>(1);
+ dataVerIds.add(dataVerId);
+ return buildSuccWithData("Ok", dataVerIds, strBuffer);
+ }
+
+ public static StringBuilder buildSuccWithData(String errMsg,
+ List<Long> dataVerIds,
+ StringBuilder strBuffer) {
+ int count = 0;
+ strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"")
+ .append(errMsg).append("\",\"data\":[");
+ for (Long dataVerId : dataVerIds) {
+ if (dataVerId == null) {
+ continue;
+ }
+ if (count++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer.append("{\"").append(WebFieldDef.DATAVERSIONID.name)
+ .append("\":").append(dataVerId).append("}");
+ }
+ strBuffer.append("],\"count\":").append(count).append("}");
+ return strBuffer;
+ }
+
+ /**
+ * Parse the parameter value required for add, update, delete record
+ *
+ * @param req Http Servlet Request
+ * @param isAdd if add commend
+ * @param result process result of parameter value, include a
+ * tuple3 object(dataVersionId, operator, opData) info
+ * @return process result
+ */
+ public static boolean getAUDBaseInfo(HttpServletRequest req,
+ boolean isAdd, ProcessResult result) {
+ // check and get data version id
+ if (!WebParameterUtils.getLongParamValue(req,
WebFieldDef.DATAVERSIONID,
+ false, TServerConstants.DEFAULT_DATA_VERSION, result)) {
+ return result.isSuccess();
+ }
+ Long dataVerId = (Long) result.retData1;
+ // check and get createUser or modifyUser
+ String operator = null;
+ Date opDate = null;
+ if (isAdd) {
+ // check create user field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.CREATEUSER, true, null, result)) {
+ return result.isSuccess();
+ }
+ operator = (String) result.retData1;
+ // check and get create date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.CREATEDATE, false, new Date(), result)) {
+ return result.isSuccess();
+ }
+ opDate = (Date) result.retData1;
+
+ } else {
+ // check modify user field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.MODIFYUSER, true, null, result)) {
+ return result.isSuccess();
+ }
+ operator = (String) result.retData1;
+ // check and get modify date
+ if (!WebParameterUtils.getDateParameter(req,
+ WebFieldDef.MODIFYDATE, false, new Date(), result)) {
+ return result.isSuccess();
+ }
+ opDate = (Date) result.retData1;
+ }
+ result.setSuccResult(new Tuple3<Long, String, Date>(
+ dataVerId, operator, opDate));
+ return result.isSuccess();
+ }
+
+ public static boolean getQryPriorityIdParameter(HttpServletRequest req,
+ boolean required, int
defValue,
+ int minValue,
ProcessResult result) {
+ if (!getIntParamValue(req, WebFieldDef.QRYPRIORITYID,
+ required, defValue, minValue, result)) {
+ return result.success;
+ }
+ int qryPriorityId = (int) result.retData1;
+ if (qryPriorityId > 303 || qryPriorityId < 101) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter:
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" value must be greater than or equal")
+ .append(" to 101 and less than or equal to
303!").toString());
+ return false;
+ }
+ if (!allowedPriorityVal.contains(qryPriorityId % 100)) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter: the units of
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" must in
").append(allowedPriorityVal).toString());
+ return false;
+ }
+ if (!allowedPriorityVal.contains(qryPriorityId / 100)) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Illegal value in
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" parameter: the hundreds of
").append(WebFieldDef.QRYPRIORITYID.name)
+ .append(" must in
").append(allowedPriorityVal).toString());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Decode the deletePolicy parameter value from an object value
+ * the value must like {method},{digital}[s|m|h]
+ *
+ * @param req Http Servlet Request
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if failed to parse value
from the given object
+ * @param result process result of parameter value
+ * @return the process result
+ */
+ public static boolean getDeletePolicyParameter(HttpServletRequest req,
+ boolean required, String
defValue,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.DELETEPOLICY, required, defValue, result)) {
+ return result.isSuccess();
+ }
+ String delPolicy = (String) result.retData1;
+ if (TStringUtils.isBlank(delPolicy)) {
+ return result.isSuccess();
+ }
+ // check value format
+ String[] tmpStrs = delPolicy.split(",");
+ if (tmpStrs.length != 2) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512)
+ .append("Value must include one and only one comma
character,")
+ .append(" the format of
").append(WebFieldDef.DELETEPOLICY.name())
+ .append(" must like
{method},{digital}[m|s|h]").toString());
+ return result.isSuccess();
+ }
+ if (TStringUtils.isBlank(tmpStrs[0])) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512)
+ .append("Method value must not be blank")
+ .append(" the format of
").append(WebFieldDef.DELETEPOLICY.name())
+ .append(" must like
{method},{digital}[m|s|h]").toString());
+ return result.isSuccess();
+ }
+ if (!"delete".equalsIgnoreCase(tmpStrs[0].trim())) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512).append("Field ")
+ .append(WebFieldDef.DELETEPOLICY.name())
+ .append(" only support delete method
now!").toString());
+ return result.isSuccess();
+ }
+ String validValStr = tmpStrs[1];
+ String timeUnit = validValStr.substring(validValStr.length() -
1).toLowerCase();
+ if (Character.isLetter(timeUnit.charAt(0))) {
+ if (!allowedDelUnits.contains(timeUnit)) {
+
result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512).append("Field ")
+ .append(WebFieldDef.DELETEPOLICY.name())
+ .append(" only support [s|m|h]
unit!").toString());
+ return result.isSuccess();
+ }
+ }
+ long validDuration = 0;
+ try {
+ if (timeUnit.endsWith("s")) {
+ validDuration = Long.parseLong(validValStr.substring(0,
validValStr.length() - 1)) * 1000;
+ } else if (timeUnit.endsWith("m")) {
+ validDuration = Long.parseLong(validValStr.substring(0,
validValStr.length() - 1)) * 60000;
+ } else if (timeUnit.endsWith("h")) {
+ validDuration = Long.parseLong(validValStr.substring(0,
validValStr.length() - 1)) * 3600000;
+ } else {
+ validDuration = Long.parseLong(validValStr) * 3600000;
+ }
+ } catch (Throwable e) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512).append("The value of field ")
+ .append(WebFieldDef.DELETEPOLICY.name())
+ .append("'s valid duration must
digits!").toString());
+ return result.isSuccess();
+ }
+ if (validDuration <= 0 || validDuration >
DataStoreUtils.MAX_FILE_VALID_DURATION) {
+ result.setFailResult(DataOpErrCode.DERR_ILLEGAL_VALUE.getCode(),
+ new StringBuilder(512).append("The value of field ")
+ .append(WebFieldDef.DELETEPOLICY.name())
+ .append(" must be greater than 0 and less than or
equal to")
+ .append(DataStoreUtils.MAX_FILE_VALID_DURATION)
+ .append(" seconds!").toString());
+ return result.isSuccess();
+ }
+ if (Character.isLetter(timeUnit.charAt(0))) {
+ result.setSuccResult(new StringBuilder(512).append("delete,")
+ .append(validValStr.substring(0, validValStr.length() - 1))
+ .append(timeUnit).toString());
+ } else {
+ result.setSuccResult(new StringBuilder(512).append("delete,")
+ .append(validValStr).append("h").toString());
+ }
+ return result.isSuccess();
+ }
+
+ /**
+ * Parse the parameter value for TopicPropGroup class
+ *
+ * @param req Http Servlet Request
+ * @param defVal default value
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getTopicPropInfo(HttpServletRequest req,
+ TopicPropGroup defVal,
+ ProcessResult result) {
+ TopicPropGroup newConf = new TopicPropGroup();
+ // get numTopicStores parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.NUMTOPICSTORES,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_STOREBLOCK_NUM_MIN, result)) {
+ return result.isSuccess();
+ }
+ int numTopicStores = (int) result.retData1;
+ if (numTopicStores == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ numTopicStores = TServerConstants.TOPIC_STOREBLOCK_NUM_MIN;
+ } else {
+ numTopicStores = defVal.getNumTopicStores();
+ }
+ }
+ newConf.setNumTopicStores(numTopicStores);
+ // get numPartitions parameter value
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.NUMPARTITIONS,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_PARTITION_NUM_MIN, result)) {
+ return result.isSuccess();
+ }
+ int numPartitions = (int) result.retData1;
+ if (numPartitions == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ numPartitions = TServerConstants.TOPIC_PARTITION_NUM_MIN;
+ } else {
+ numPartitions = defVal.getNumPartitions();
+ }
+ }
+ newConf.setNumPartitions(numPartitions);
+ // get unflushThreshold parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.UNFLUSHTHRESHOLD,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHTHRESHOLD_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushThreshold = (int) result.retData1;
+ if (unflushThreshold == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ unflushThreshold =
TServerConstants.TOPIC_DSK_UNFLUSHTHRESHOLD_DEF;
+ } else {
+ unflushThreshold = defVal.getUnflushThreshold();
+ }
+ }
+ newConf.setUnflushThreshold(unflushThreshold);
+ // get unflushInterval parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.UNFLUSHINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHINTERVAL_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushInterval = (int) result.retData1;
+ if (unflushInterval == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ unflushInterval =
TServerConstants.TOPIC_DSK_UNFLUSHINTERVAL_DEF;
+ } else {
+ unflushInterval = defVal.getUnflushInterval();
+ }
+ }
+ newConf.setUnflushInterval(unflushInterval);
+ // get unflushDataHold parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.UNFLUSHINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_DSK_UNFLUSHDATAHOLD_MIN, result)) {
+ return result.isSuccess();
+ }
+ int unflushDataHold = (int) result.retData1;
+ if (unflushDataHold == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ unflushDataHold =
TServerConstants.TOPIC_DSK_UNFLUSHDATAHOLD_MIN;
+ } else {
+ unflushDataHold = defVal.getUnflushDataHold();
+ }
+ }
+ newConf.setUnflushDataHold(unflushDataHold);
+ // get memCacheMsgSizeInMB parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.MCACHESIZEINMB,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHESIZE_MB_MIN,
+ TServerConstants.TOPIC_CACHESIZE_MB_MAX, result)) {
+ return result.isSuccess();
+ }
+ int cacheMsgSizeInMB = (int) result.retData1;
+ if (cacheMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ cacheMsgSizeInMB = TServerConstants.TOPIC_CACHESIZE_MB_DEF;
+ } else {
+ cacheMsgSizeInMB = defVal.getMemCacheMsgSizeInMB();
+ }
+ }
+ newConf.setMemCacheMsgSizeInMB(cacheMsgSizeInMB);
+ // get memCacheFlushIntvl parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.UNFMCACHEINTERVAL,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHEINTVL_MIN, result)) {
+ return result.isSuccess();
+ }
+ int cacheFlushIntvl = (int) result.retData1;
+ if (cacheFlushIntvl == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ cacheFlushIntvl = TServerConstants.TOPIC_CACHEINTVL_DEF;
+ } else {
+ cacheFlushIntvl = defVal.getMemCacheFlushIntvl();
+ }
+ }
+ newConf.setMemCacheFlushIntvl(cacheFlushIntvl);
+ // get memCacheMsgCntInK parameter value
+ if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.UNFMCACHECNTINK,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TServerConstants.TOPIC_CACHECNT_INK_MIN, result)) {
+ return result.isSuccess();
+ }
+ int cacheMsgCntInK = (int) result.retData1;
+ if (cacheMsgCntInK == TBaseConstants.META_VALUE_UNDEFINED) {
+ if (defVal == null) {
+ cacheMsgCntInK = TServerConstants.TOPIC_CACHECNT_INK_DEF;
+ } else {
+ cacheMsgCntInK = defVal.getMemCacheMsgCntInK();
+ }
+ }
+ newConf.setMemCacheMsgCntInK(cacheMsgCntInK);
+ // get deletePolicy parameter value
+ if (!WebParameterUtils.getDeletePolicyParameter(req,
+ false, null, result)) {
+ return result.isSuccess();
+ }
+ String deletePolicy = (String) result.retData1;
+ if (deletePolicy == null) {
+ if (defVal == null) {
+ deletePolicy = TServerConstants.TOPIC_POLICY_DEF;
+ } else {
+ deletePolicy = defVal.getDeletePolicy();
+ }
+ }
+ newConf.setDeletePolicy(deletePolicy);
+ // get acceptPublish parameter value
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ACCEPTPUBLISH, false, null, result)) {
+ return result.isSuccess();
+ }
+ Boolean acceptPublish = (Boolean) result.retData1;
+ if (acceptPublish == null) {
+ if (defVal == null) {
+ acceptPublish = true;
+ } else {
+ acceptPublish = defVal.getAcceptPublish();
+ }
+ }
+ newConf.setAcceptPublish(acceptPublish);
+ // get acceptSubscribe parameter value
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.ACCEPTSUBSCRIBE, false, null, result)) {
+ return result.isSuccess();
+ }
+ Boolean acceptSubscribe = (Boolean) result.retData1;
+ if (acceptSubscribe == null) {
+ if (defVal == null) {
+ acceptSubscribe = true;
+ } else {
+ acceptSubscribe = defVal.getAcceptSubscribe();
+ }
+ }
+ newConf.setAcceptSubscribe(acceptSubscribe);
+ result.setSuccResult(newConf);
+ return result.isSuccess();
+ }
/**
* Parse the parameter value from an object value to a long value
@@ -238,28 +666,6 @@ public class WebParameterUtils {
return tmpParamValue;
}
- public static StringBuilder buildFailResult(StringBuilder strBuffer,
String errMsg) {
- return
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")
- .append(errMsg).append("\"}");
- }
-
- public static StringBuilder buildSuccessResult(StringBuilder strBuffer) {
- return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
- }
-
- public static StringBuilder buildSuccessResult(StringBuilder strBuffer,
String appendInfo) {
- return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"").
- append(appendInfo).append("\"}");
- }
-
- public static StringBuilder buildSuccessWithDataRetBegin(StringBuilder
strBuffer) {
- return
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
- }
- public static StringBuilder buildSuccessWithDataRetEnd(
- StringBuilder strBuffer, int totalCnt) {
- return strBuffer.append("],\"count\":").append(totalCnt).append("}");
- }
-
/**
* Parse the parameter value from an object value to a long value
*
@@ -338,6 +744,27 @@ public class WebParameterUtils {
/**
* Parse the parameter value from an object value to a integer value
*
+ * @param keyValueMap parameter key value map
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param minValue min value required
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getIntParamValue(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ int defValue,
+ int minValue,
+ ProcessResult result) {
+ return getIntParamValue(keyValueMap, fieldDef, required, true,
defValue,
+ true, minValue, false, TBaseConstants.META_VALUE_UNDEFINED,
result);
+ }
+
+ /**
+ * Parse the parameter value from an object value to a integer value
+ *
* @param req Http Servlet Request
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is
must required
@@ -371,6 +798,36 @@ public class WebParameterUtils {
if (!getStringParamValue(req, fieldDef, required, null, result)) {
return result.success;
}
+ return checkIntParamValue(fieldDef, hasDefVal, defValue,
+ hasMinVal, minValue, hasMaxVal, maxValue, result);
+ }
+
+ private static boolean getIntParamValue(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ boolean hasDefVal,
+ int defValue,
+ boolean hasMinVal,
+ int minValue,
+ boolean hasMaxVal,
+ int maxValue,
+ ProcessResult result) {
+ if (!getStringParamValue(keyValueMap, fieldDef, required, null,
result)) {
+ return result.success;
+ }
+ return checkIntParamValue(fieldDef, hasDefVal, defValue,
+ hasMinVal, minValue, hasMaxVal, maxValue, result);
+ }
+
+ private static boolean checkIntParamValue(WebFieldDef fieldDef,
+ boolean hasDefVal,
+ int defValue,
+ boolean hasMinVal,
+ int minValue,
+ boolean hasMaxVal,
+ int maxValue,
+ ProcessResult result) {
+
if (fieldDef.isCompFieldType()) {
Set<Integer> tgtValueSet = new HashSet<Integer>();
Set<String> valItemSet = (Set<String>) result.retData1;
@@ -402,7 +859,6 @@ public class WebParameterUtils {
}
return result.success;
}
-
/**
* Parse the parameter value from an object value to a boolean value
*
@@ -416,7 +872,7 @@ public class WebParameterUtils {
public static boolean getBooleanParamValue(HttpServletRequest req,
WebFieldDef fieldDef,
boolean required,
- boolean defValue,
+ Boolean defValue,
ProcessResult result) {
if (!getStringParamValue(req, fieldDef, required, null, result)) {
return result.success;
@@ -433,6 +889,30 @@ public class WebParameterUtils {
/**
* Parse the parameter value from an object value
*
+ * @param keyValueMap Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param result process result
+ * @return valid result for the parameter value
+ */
+ public static boolean getStringParamValue(Map<String, String> keyValueMap,
+ WebFieldDef fieldDef,
+ boolean required,
+ String defValue,
+ ProcessResult result) {
+ // get parameter value
+ String paramValue = keyValueMap.get(fieldDef.name);
+ if (paramValue == null) {
+ paramValue = keyValueMap.get(fieldDef.shortName);
+ }
+ return checkStrParamValue(paramValue,
+ fieldDef, required, defValue, result);
+ }
+
+ /**
+ * Parse the parameter value from an HttpServletRequest
+ *
* @param req Http Servlet Request
* @param fieldDef the parameter field definition
* @param required a boolean value represent whether the parameter is
must required
@@ -450,6 +930,25 @@ public class WebParameterUtils {
if (paramValue == null) {
paramValue = req.getParameter(fieldDef.shortName);
}
+ return checkStrParamValue(paramValue,
+ fieldDef, required, defValue, result);
+ }
+
+ /**
+ * Check the parameter value
+ *
+ * @param paramValue parameter value
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param result process result
+ * @return valid result for the parameter value
+ */
+ private static boolean checkStrParamValue(String paramValue,
+ WebFieldDef fieldDef,
+ boolean required,
+ String defValue,
+ ProcessResult result) {
if (TStringUtils.isNotBlank(paramValue)) {
// Cleanup value extra characters
paramValue = escDoubleQuotes(paramValue.trim());
@@ -510,6 +1009,165 @@ public class WebParameterUtils {
}
/**
+ * Get and valid topicName value
+ *
+ * @param req Http Servlet Request
+ * @param confManager configure manager
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getAndValidTopicNameInfo(HttpServletRequest req,
+ BrokerConfManager
confManager,
+ boolean required,
+ String defValue,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, required, defValue, result)) {
+ return result.success;
+ }
+ Set<String> topicNameSet = (Set<String>) result.retData1;
+ Set<String> existedTopicSet =
+ confManager.getTotalConfiguredTopicNames();
+ for (String topic : topicNameSet) {
+ if (!existedTopicSet.contains(topic)) {
+ result.setFailResult(new StringBuilder(512)
+ .append(WebFieldDef.COMPSTOPICNAME.name)
+ .append(" value ").append(topic)
+ .append(" is not configure, please configure
first!").toString());
+ break;
+ }
+ }
+ return result.success;
+ }
+
+ /**
+ * check the filter conditions and get them in a String
+ *
+ * @param req Http Servlet Request
+ * @param required denote whether it translate blank condition
+ * @param transBlank whether to translate condition item
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getFilterCondString(HttpServletRequest req,
+ boolean required,
+ boolean transBlank,
+ ProcessResult result) {
+ if (!getFilterCondSet(req, required, false, result)) {
+ return result.success;
+ }
+ return transCondItemsToStr(transBlank, result);
+ }
+
+ /**
+ * check the filter conditions and get them in a String
+ *
+ * @param keyValueMap parameter key value map
+ * @param required denote whether it translate blank condition
+ * @param transBlank whether to translate condition item
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getFilterCondString(Map<String, String> keyValueMap,
+ boolean required,
+ boolean transBlank,
+ ProcessResult result) {
+ if (!getFilterCondSet(keyValueMap, required, false, result)) {
+ return result.success;
+ }
+ return transCondItemsToStr(transBlank, result);
+ }
+
+
+ /**
+ * check the filter conditions and get them in a set
+ *
+ * @param req Http Servlet Request
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param transCondItem whether to translate condition item
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getFilterCondSet(HttpServletRequest req,
+ boolean required,
+ boolean transCondItem,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.FILTERCONDS, required, null, result)) {
+ return result.success;
+ }
+ if (transCondItem) {
+ return transCondItems(result);
+ }
+ return result.success;
+ }
+
+ /**
+ * check the filter conditions and get them in a set
+ *
+ * @param keyValueMap Http Servlet Request
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param transCondItem whether to translate condition item
+ * @param result process result of parameter value
+ * @return process result
+ */
+ private static boolean getFilterCondSet(Map<String, String> keyValueMap,
+ boolean required,
+ boolean transCondItem,
+ ProcessResult result) {
+ if (!WebParameterUtils.getStringParamValue(keyValueMap,
+ WebFieldDef.FILTERCONDS, required, null, result)) {
+ return result.success;
+ }
+ if (transCondItem) {
+ return transCondItems(result);
+ }
+ return result.success;
+ }
+
+ /**
+ * translate filter condition item with "''"
+ */
+ private static boolean transCondItems(ProcessResult result) {
+ Set<String> filterCondSet = (Set<String>) result.retData1;
+ if (!filterCondSet.isEmpty()) {
+ Set<String> newFilterCondSet = new HashSet<>(filterCondSet.size());
+ StringBuilder sBuilder = new StringBuilder(512);
+ for (String filterCond : filterCondSet) {
+ newFilterCondSet.add(sBuilder.append(TokenConstants.ARRAY_SEP)
+
.append(filterCond).append(TokenConstants.ARRAY_SEP).toString());
+ sBuilder.delete(0, sBuilder.length());
+ }
+ result.setSuccResult(newFilterCondSet);
+ }
+
+ return result.success;
+ }
+
+ /**
+ * translate filter condition item to String
+ */
+ private static boolean transCondItemsToStr(boolean transBlank,
+ ProcessResult result) {
+ StringBuilder sBuffer = new StringBuilder(512);
+ Set<String> filterCondSet = (Set<String>) result.retData1;
+ if (filterCondSet.isEmpty()) {
+ if (transBlank) {
+ sBuffer.append(TServerConstants.BLANK_FILTER_ITEM_STR);
+ }
+ } else {
+ sBuffer.append(TokenConstants.ARRAY_SEP);
+ for (String filterCond : filterCondSet) {
+ sBuffer.append(filterCond).append(TokenConstants.ARRAY_SEP);
+ }
+ }
+ result.setSuccResult(sBuffer.toString());
+ return result.success;
+ }
+
+ /**
* Parse the parameter value from an json dict
*
* @param req Http Servlet Request
@@ -586,6 +1244,82 @@ public class WebParameterUtils {
}
/**
+ * Parse the parameter value from an json array
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param result process result
+ * @return valid result for the parameter value
+ */
+ public static boolean getJsonArrayParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ List<Map<String, String>>
defValue,
+ ProcessResult result) {
+ // get parameter value
+ String paramValue = req.getParameter(fieldDef.name);
+ if (paramValue == null) {
+ paramValue = req.getParameter(fieldDef.shortName);
+ }
+ if (TStringUtils.isNotBlank(paramValue)) {
+ // Cleanup value extra characters
+ paramValue = escDoubleQuotes(paramValue.trim());
+ }
+ // Check if the parameter exists
+ if (TStringUtils.isBlank(paramValue)) {
+ if (required) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" is missing or value is null or
blank!").toString());
+ } else {
+ result.setSuccResult(defValue);
+ }
+ return result.success;
+ }
+ try {
+ paramValue = URLDecoder.decode(paramValue,
+ TBaseConstants.META_DEFAULT_CHARSET_NAME);
+ } catch (UnsupportedEncodingException e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" decode error, exception is ")
+ .append(e.toString()).toString());
+ }
+ if (TStringUtils.isBlank(paramValue)) {
+ if (required) {
+ result.setFailResult(new StringBuilder(512).append("Parameter
")
+ .append(fieldDef.name).append("'s value is
blank!").toString());
+ } else {
+ result.setSuccResult(defValue);
+ }
+ return result.success;
+ }
+ if (fieldDef.valMaxLen != TBaseConstants.META_VALUE_UNDEFINED) {
+ if (paramValue.length() > fieldDef.valMaxLen) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append("'s length over max allowed length (")
+ .append(fieldDef.valMaxLen).append(")!").toString());
+ return result.success;
+ }
+ }
+ // parse data
+ try {
+ List<Map<String, String>> arrayValue = new
Gson().fromJson(paramValue,
+ new TypeToken<List<Map<String, String>>>(){}.getType());
+ result.setSuccResult(arrayValue);
+ } catch (Throwable e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" value parse failure, error is ")
+ .append(e.getMessage()).append("!").toString());
+ }
+ return result.success;
+ }
+
+ /**
* Parse the parameter value from an string value to Date value
*
* @param req Http Servlet Request
@@ -1469,4 +2203,60 @@ public class WebParameterUtils {
}
return inPutStr;
}
+
+ // translate rule info to json format string
+ public static int getAndCheckFlowRules(HttpServletRequest req,
+ String defValue,
+ ProcessResult result) {
+ int ruleCnt = 0;
+ StringBuilder strBuffer = new StringBuilder(512);
+ // get parameter value
+ String paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.name);
+ if (paramValue == null) {
+ paramValue = req.getParameter(WebFieldDef.FLOWCTRLSET.shortName);
+ }
+ if (TStringUtils.isBlank(paramValue)) {
+ result.setSuccResult(defValue);
+ return ruleCnt;
+ }
+ strBuffer.append("[");
+ paramValue = paramValue.trim();
+ List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
+ FlowCtrlRuleHandler flowCtrlRuleHandler =
+ new FlowCtrlRuleHandler(true);
+ Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap;
+ try {
+ flowCtrlItemMap =
+ flowCtrlRuleHandler.parseFlowCtrlInfo(paramValue);
+ } catch (Throwable e) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parse parameter
").append(WebFieldDef.FLOWCTRLSET.name)
+ .append(" failure: '").append(e.toString()).toString());
+ return 0;
+ }
+ for (Integer typeId : ruleTypes) {
+ if (typeId != null) {
+ int rules = 0;
+ List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
+ if (flowCtrlItems != null) {
+ if (ruleCnt++ > 0) {
+ strBuffer.append(",");
+ }
+
strBuffer.append("{\"type\":").append(typeId.intValue()).append(",\"rule\":[");
+ for (FlowCtrlItem flowCtrlItem : flowCtrlItems) {
+ if (flowCtrlItem != null) {
+ if (rules++ > 0) {
+ strBuffer.append(",");
+ }
+ strBuffer = flowCtrlItem.toJsonString(strBuffer);
+ }
+ }
+ strBuffer.append("]}");
+ }
+ }
+ }
+ strBuffer.append("]");
+ result.setSuccResult(strBuffer.toString());
+ return ruleCnt;
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index de510a6..dd4922c 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -104,6 +104,7 @@ import
org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
@@ -144,6 +145,7 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
private final OffsetStorage zkOffsetStorage; //zookeeper
offset manager
private final ShutdownHook shutdownHook; //shutdown hook
private final DefaultBdbStoreService defaultBdbStoreService; //bdb
store service
+ private final MetaDataManager defMetaDataManager;
private final BrokerConfManager defaultBrokerConfManager;
//broker config manager
private final CertificateMasterHandler serverAuthHandler;
//server auth handler
private AtomicBoolean shutdownHooked = new AtomicBoolean(false);
@@ -208,6 +210,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
new ReleaseBroker().run(nodeId);
}
});
+
+ this.defMetaDataManager = new
MetaDataManager(masterConfig.getHostName(),
+ masterConfig.getMetaDataPath(),
masterConfig.getReplicationConfig());
+ this.defMetaDataManager.start();
+
this.defaultBdbStoreService = new DefaultBdbStoreService(masterConfig,
this);
this.defaultBdbStoreService.start();
this.defaultBrokerConfManager = new
BrokerConfManager(this.defaultBdbStoreService);
@@ -282,6 +289,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
return this.defaultBrokerConfManager;
}
+
+ public MetaDataManager getDefMetaDataManager() {
+ return defMetaDataManager;
+ }
+
/**
* Producer register request to master
*
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
index ec342d1..54aad1b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
@@ -134,6 +134,24 @@ public class BdbTopicAuthControlEntity implements
Serializable {
String.valueOf(dataVerId));
}
+ public int getTopicId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_TOPICNAME_ID);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setTopicId(int topicId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_TOPICNAME_ID,
+ String.valueOf(topicId));
+ }
+
+
public int getMaxMsgSize() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/DataOpErrCode.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/DataOpErrCode.java
index c65b5eb..fdd94c8 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/DataOpErrCode.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/DataOpErrCode.java
@@ -26,6 +26,7 @@ public enum DataOpErrCode {
DERR_UNCLEANED(404, "Related configuration is not cleaned up."),
DERR_CONDITION_LACK(405, "The preconditions are not met"),
DERR_ILLEGAL_STATUS(406, "Illegal operate status"),
+ DERR_ILLEGAL_VALUE(407, "Illegal data format or value"),
DERR_STORE_ABNORMAL(501, "Store layer throw exception."),
DERR_STORE_STOPPED(510, "Store stopped."),
DERR_STORE_NOT_MASTER(511, "Store not active master.");
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index c5cf6f4..6232bd3 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -915,7 +915,7 @@ public class MetaDataManager implements Server {
}
strBuffer.delete(0, strBuffer.length());
// add topic control record
- addIfAbsentTopicCtrlConf(entity.getTopicName(),
+ addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
entity.getCreateUser(), strBuffer, new ProcessResult());
return result.isSuccess();
}
@@ -950,7 +950,7 @@ public class MetaDataManager implements Server {
}
strBuffer.delete(0, strBuffer.length());
// add topic control record
- addIfAbsentTopicCtrlConf(entity.getTopicName(),
+ addIfAbsentTopicCtrlConf(entity.getTopicName(), entity.getTopicId(),
entity.getCreateUser(), strBuffer, new ProcessResult());
return result.isSuccess();
}
@@ -1119,10 +1119,12 @@ public class MetaDataManager implements Server {
* Add if absent topic control configure info
*
* @param topicName the topic name will be add
+ * @param topicNameId the topic name id will be add
* @param operator operator
* @param strBuffer the print info string buffer
*/
private void addIfAbsentTopicCtrlConf(String topicName,
+ int topicNameId,
String operator,
StringBuilder strBuffer,
ProcessResult result) {
@@ -1131,7 +1133,7 @@ public class MetaDataManager implements Server {
if (curEntity != null) {
return;
}
- curEntity = new TopicCtrlEntity(topicName, operator);
+ curEntity = new TopicCtrlEntity(topicName, topicNameId, operator);
if (metaStoreService.addTopicCtrlConf(curEntity, result)) {
strBuffer.append("[addIfAbsentTopicCtrlConf], ")
.append(curEntity.getCreateUser())
@@ -1256,6 +1258,38 @@ public class MetaDataManager implements Server {
}
/**
+ * Update cluster default setting
+ *
+ * @param entity the cluster default setting entity will be add
+ * @param strBuffer the print info string buffer
+ * @param result the process result return
+ * @return true if success otherwise false
+ * @throws Exception
+ */
+ public boolean confModClusterDefSetting(ClusterSettingEntity entity,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ if (metaStoreService.updClusterConfig(entity, result)) {
+ ClusterSettingEntity oldEntity =
+ (ClusterSettingEntity) result.getRetData();
+ ClusterSettingEntity curEntity =
+ metaStoreService.getClusterConfig();
+ strBuffer.append("[confModClusterDefSetting], ")
+ .append(entity.getModifyUser())
+ .append(" updated record from
:").append(oldEntity.toString())
+ .append(" to ").append(curEntity.toString());
+ logger.info(strBuffer.toString());
+ } else {
+ strBuffer.append("[confModClusterDefSetting], ")
+ .append("failure to update record : ")
+ .append(result.getErrInfo());
+ logger.warn(strBuffer.toString());
+ }
+ strBuffer.delete(0, strBuffer.length());
+ return result.isSuccess();
+ }
+
+ /**
* Delete cluster default setting
*
* @param operator operator
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 488a797..2037fde 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -26,10 +26,11 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
// AbstractEntity: entity's abstract class
-public class BaseEntity implements Serializable {
+public class BaseEntity implements Serializable, Cloneable {
private long dataVersionId =
TServerConstants.DEFAULT_DATA_VERSION; // 0: default version,
other: version
@@ -73,16 +74,15 @@ public class BaseEntity implements Serializable {
this.modifyDate = modifyDate;
}
- public void setCreateUserInfo(String createUser, Date createDate) {
- this.createUser = createUser;
- this.createDate = createDate;
- this.modifyUser = createUser;
- this.modifyDate = createDate;
- }
-
- public void setModifyUserInfo(String modifyUser, Date modifyDate) {
- this.modifyUser = modifyUser;
- this.modifyDate = modifyDate;
+ public void setModifyInfo(long dataVersionId, boolean isAdd,
+ String operator, Date opDate) {
+ this.dataVersionId = dataVersionId;
+ if (isAdd) {
+ this.createUser = operator;
+ this.createDate = opDate;
+ }
+ this.modifyUser = operator;
+ this.modifyDate = opDate;
}
public void setDataVersionId() {
@@ -160,6 +160,36 @@ public class BaseEntity implements Serializable {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ if (isLongName) {
+ sBuilder.append(",\"dataVersionId\":").append(dataVersionId)
+
.append(",\"createUser\":\"").append(createUser).append("\"")
+ .append(",\"createDate\":\"")
+
.append(WebParameterUtils.date2yyyyMMddHHmmss(createDate)).append("\"")
+
.append(",\"modifyUser\":\"").append(modifyUser).append("\"")
+ .append(",\"modifyDate\":\"")
+
.append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate)).append("\"")
+
.append(",\"attributes\":\"").append(attributes).append("\"");
+ } else {
+ sBuilder.append(",\"dVerId\":").append(dataVersionId)
+ .append(",\"cur\":\"").append(createUser).append("\"")
+ .append(",\"cDate\":\"")
+
.append(WebParameterUtils.date2yyyyMMddHHmmss(createDate)).append("\"")
+ .append(",\"mur\":\"").append(modifyUser).append("\"")
+ .append(",\"mDate\":\"")
+
.append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate)).append("\"")
+ .append(",\"attrs\":\"").append(attributes).append("\"");
+ }
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -182,4 +212,9 @@ public class BaseEntity implements Serializable {
return Objects.hash(dataVersionId, createUser,
createDate, modifyUser, modifyDate, attributes);
}
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 5d30fa1..1713fa8 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -23,6 +23,7 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -32,7 +33,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
*
*/
-public class BrokerConfEntity extends BaseEntity {
+public class BrokerConfEntity extends BaseEntity implements Cloneable {
// Primary Key
private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
private String brokerIp = "";
@@ -46,13 +47,14 @@ public class BrokerConfEntity extends BaseEntity {
private boolean isBrokerLoaded = false; //broker conf load flag
private int regionId = TBaseConstants.META_VALUE_UNDEFINED;
private int groupId = TBaseConstants.META_VALUE_UNDEFINED;
- private TopicPropGroup topicProps = null;
+ private TopicPropGroup topicProps = new TopicPropGroup();
+ // Redundant fields begin
private String brokerAddress = ""; // broker ip:port
private String brokerFullInfo = ""; // broker brokerId:ip:port
private String brokerSimpleInfo = ""; // broker brokerId:ip:
private String brokerTLSSimpleInfo = ""; //tls simple info
private String brokerTLSFullInfo = ""; //tls full info
-
+ // Redundant fields end
public BrokerConfEntity() {
super();
@@ -304,6 +306,46 @@ public class BrokerConfEntity extends BaseEntity {
}
/**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ String manageSts =
+
WebParameterUtils.getBrokerManageStatusStr(getManageStatus().getCode());
+ if (isLongName) {
+ sBuilder.append("{\"brokerId\":").append(brokerId)
+ .append(",\"brokerIp\":\"").append(brokerIp).append("\"")
+ .append(",\"brokerPort\":").append(brokerPort)
+ .append(",\"brokerTLSPort\":").append(brokerTLSPort)
+ .append(",\"brokerWebPort\":").append(brokerWebPort)
+
.append(",\"manageStatus\":\"").append(manageSts).append("\"")
+ .append(",\"isConfChanged\":").append(isConfDataUpdated)
+ .append(",\"isConfLoaded\":").append(isBrokerLoaded)
+ .append(",\"regionId\":").append(regionId)
+ .append(",\"groupId\":").append(groupId);
+ } else {
+ sBuilder.append("{\"brkId\":").append(brokerId)
+ .append(",\"bIp\":\"").append(brokerIp).append("\"")
+ .append(",\"bPort\":").append(brokerPort)
+ .append(",\"bTlsPort\":").append(brokerTLSPort)
+ .append(",\"bWebPort\":").append(brokerWebPort)
+ .append(",\"mSts\":\"").append(manageSts).append("\"")
+ .append(",\"isConfChg\":").append(isConfDataUpdated)
+ .append(",\"isConfLd\":").append(isBrokerLoaded)
+ .append(",\"rId\":").append(regionId)
+ .append(",\"gId\":").append(groupId);
+ }
+ topicProps.toWebJsonStr(sBuilder, isLongName);
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
+ /**
* Get broker config string
*
* @return config string
@@ -362,4 +404,18 @@ public class BrokerConfEntity extends BaseEntity {
brokerTLSSimpleInfo, brokerTLSFullInfo);
}
+ @Override
+ public BrokerConfEntity clone() {
+ try {
+ BrokerConfEntity copy = (BrokerConfEntity) super.clone();
+ copy.setManageStatus(getManageStatus());
+ if (copy.getTopicProps() != null) {
+ copy.setTopicProps(getTopicProps().clone());
+ }
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 1a0f576..bb58740 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
+import java.util.Date;
import java.util.Objects;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.server.common.statusdef.EnableStatus;
@@ -29,7 +30,7 @@ import
org.apache.tubemq.server.master.metamanage.metastore.TStoreConstants;
* store the cluster default setting
*
*/
-public class ClusterSettingEntity extends BaseEntity {
+public class ClusterSettingEntity extends BaseEntity implements Cloneable {
private String recordKey =
TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING;
@@ -54,6 +55,10 @@ public class ClusterSettingEntity extends BaseEntity {
super();
}
+ public ClusterSettingEntity(long dataVerId, String createUser, Date
createDate) {
+ super(dataVerId, createUser, createDate);
+ }
+
// Constructor by BdbClusterSettingEntity
public ClusterSettingEntity(BdbClusterSettingEntity bdbEntity) {
super(bdbEntity.getConfigId(), bdbEntity.getModifyUser(),
bdbEntity.getModifyDate());
@@ -152,6 +157,13 @@ public class ClusterSettingEntity extends BaseEntity {
this.qryPriorityId = qryPriorityId;
}
+ public EnableStatus getGloFlowCtrlStatus() {
+ return gloFlowCtrlStatus;
+ }
+
+ public void setGloFlowCtrlStatus(EnableStatus gloFlowCtrlStatus) {
+ this.gloFlowCtrlStatus = gloFlowCtrlStatus;
+ }
public int getGloFlowCtrlRuleCnt() {
return gloFlowCtrlRuleCnt;
@@ -181,6 +193,46 @@ public class ClusterSettingEntity extends BaseEntity {
this.clsDefTopicProps = clsDefTopicProps;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ int tmpMsgSizeInMB = maxMsgSizeInB;
+ if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
+ tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
+ }
+ if (isLongName) {
+ sBuilder.append("{\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"brokerPort\":").append(brokerPort)
+ .append(",\"brokerTLSPort\":").append(brokerTLSPort)
+ .append(",\"brokerWebPort\":").append(brokerWebPort)
+ .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB)
+ .append(",\"qryPriorityId\":").append(qryPriorityId)
+
.append(",\"flowCtrlEnable\":").append(gloFlowCtrlStatus.isEnable())
+
.append(",\"flowCtrlRuleCount\":").append(gloFlowCtrlRuleCnt)
+ .append(",\"flowCtrlInfo\":").append(gloFlowCtrlRuleInfo);
+ } else {
+ sBuilder.append("{\"recKey\":\"").append(recordKey).append("\"")
+ .append(",\"bPort\":").append(brokerPort)
+ .append(",\"bTlsPort\":").append(brokerTLSPort)
+ .append(",\"bWebPort\":").append(brokerWebPort)
+ .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB)
+ .append(",\"qryPriId\":").append(qryPriorityId)
+
.append(",\"fCtrlEn\":").append(gloFlowCtrlStatus.isEnable())
+ .append(",\"fCtrlCnt\":").append(gloFlowCtrlRuleCnt)
+ .append(",\"fCtrlInfo\":").append(gloFlowCtrlRuleInfo);
+ }
+ clsDefTopicProps.toWebJsonStr(sBuilder, isLongName);
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -212,4 +264,16 @@ public class ClusterSettingEntity extends BaseEntity {
gloFlowCtrlStatus, gloFlowCtrlRuleCnt, gloFlowCtrlRuleInfo);
}
+ @Override
+ public ClusterSettingEntity clone() {
+ try {
+ ClusterSettingEntity copy = (ClusterSettingEntity) super.clone();
+ copy.setGloFlowCtrlStatus(getGloFlowCtrlStatus());
+ copy.setClsDefTopicProps(getClsDefTopicProps().clone());
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBaseCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBaseCtrlEntity.java
index 04f69ea..da98e26 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBaseCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBaseCtrlEntity.java
@@ -28,7 +28,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntit
* store the group control setting
*
*/
-public class GroupBaseCtrlEntity extends BaseEntity {
+public class GroupBaseCtrlEntity extends BaseEntity implements Cloneable {
// group name
private String groupName = "";
// resource check control
@@ -196,6 +196,37 @@ public class GroupBaseCtrlEntity extends BaseEntity {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ if (isLongName) {
+ sBuilder.append("{\"groupName\":\"").append(groupName).append("\"")
+
.append(",\"resCheckEnable\":").append(resCheckStatus.isEnable())
+
.append(",\"alwdBrokerClientRate\":").append(allowedBrokerClientRate)
+ .append(",\"qryPriorityId\":").append(qryPriorityId)
+
.append(",\"flowCtrlEnable\":").append(flowCtrlStatus.isEnable())
+ .append(",\"flowCtrlRuleCount\":").append(ruleCnt)
+ .append(",\"flowCtrlInfo\":").append(flowCtrlInfo);
+ } else {
+ sBuilder.append("{\"group\":\"").append(groupName).append("\"")
+ .append(",\"resChkEn\":").append(resCheckStatus.isEnable())
+ .append(",\"abcr\":").append(allowedBrokerClientRate)
+ .append(",\"qryPriId\":").append(qryPriorityId)
+ .append(",\"fCtrlEn\":").append(flowCtrlStatus.isEnable())
+ .append(",\"fCtrlCnt\":").append(ruleCnt)
+ .append(",\"fCtrlInfo\":").append(flowCtrlInfo);
+ }
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -223,4 +254,17 @@ public class GroupBaseCtrlEntity extends BaseEntity {
allowedBrokerClientRate, qryPriorityId, flowCtrlStatus,
ruleCnt, flowCtrlInfo);
}
+
+ @Override
+ public GroupBaseCtrlEntity clone() {
+ try {
+ GroupBaseCtrlEntity copy = (GroupBaseCtrlEntity) super.clone();
+ copy.setFlowCtrlStatus(getFlowCtrlStatus());
+ copy.setResCheckStatus(getResCheckStatus());
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
index 6fda9ec..0e36909 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupBlackListEntity.java
@@ -28,7 +28,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
* store group black list setting
*
*/
-public class GroupBlackListEntity extends BaseEntity {
+public class GroupBlackListEntity extends BaseEntity implements Cloneable {
private String recordKey = "";
private String topicName = "";
@@ -108,6 +108,31 @@ public class GroupBlackListEntity extends BaseEntity {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ if (isLongName) {
+ sBuilder.append("{\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"topicName\":\"").append(topicName).append("\"")
+ .append(",\"groupName\":\"").append(groupName).append("\"")
+ .append(",\"reason\":\"").append(reason).append("\"");
+ } else {
+ sBuilder.append("{\"recKey\":\"").append(recordKey).append("\"")
+ .append(",\"topic\":\"").append(topicName).append("\"")
+ .append(",\"group\":\"").append(groupName).append("\"")
+ .append(",\"rsn\":\"").append(reason).append("\"");
+ }
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -130,4 +155,14 @@ public class GroupBlackListEntity extends BaseEntity {
public int hashCode() {
return Objects.hash(super.hashCode(), recordKey, topicName, groupName,
reason);
}
+
+ @Override
+ public GroupBlackListEntity clone() {
+ try {
+ GroupBlackListEntity copy = (GroupBlackListEntity) super.clone();
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
index f44ffba..f5b4db7 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java
@@ -29,7 +29,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEnt
* store the group consume control setting
*
*/
-public class GroupConsumeCtrlEntity extends BaseEntity {
+public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable {
private String recordKey = "";
private String topicName = "";
private String groupName = "";
@@ -103,12 +103,15 @@ public class GroupConsumeCtrlEntity extends BaseEntity {
return groupName;
}
+ public void setFilterConsumeStatus(EnableStatus filterConsumeStatus) {
+ this.filterConsumeStatus = filterConsumeStatus;
+ }
public boolean isEnableFilterConsume() {
return filterConsumeStatus == EnableStatus.STATUS_ENABLE;
}
- public void setFilterConsumeStatus(boolean enableFilterConsume) {
+ public void setEnableFilterConsume(boolean enableFilterConsume) {
if (enableFilterConsume) {
this.filterConsumeStatus = EnableStatus.STATUS_ENABLE;
} else {
@@ -152,6 +155,33 @@ public class GroupConsumeCtrlEntity extends BaseEntity {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ if (isLongName) {
+ sBuilder.append("{\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"topicName\":\"").append(topicName).append("\"")
+ .append(",\"groupName\":\"").append(groupName).append("\"")
+
.append(",\"filterEnable\":").append(filterConsumeStatus.isEnable())
+
.append(",\"filterConds\":\"").append(filterCondStr).append("\"");
+ } else {
+ sBuilder.append("{\"recKey\":\"").append(recordKey).append("\"")
+ .append(",\"topic\":\"").append(topicName).append("\"")
+ .append(",\"group\":\"").append(groupName).append("\"")
+
.append(",\"fltEn\":").append(filterConsumeStatus.isEnable())
+
.append(",\"fltRls\":\"").append(filterCondStr).append("\"");
+ }
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -176,4 +206,15 @@ public class GroupConsumeCtrlEntity extends BaseEntity {
return Objects.hash(super.hashCode(), recordKey,
topicName, groupName, filterConsumeStatus, filterCondStr);
}
+
+ @Override
+ public GroupConsumeCtrlEntity clone() {
+ try {
+ GroupConsumeCtrlEntity copy = (GroupConsumeCtrlEntity)
super.clone();
+ copy.setFilterConsumeStatus(getFilterConsumeStatus());
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicConfEntity.java
index f925325..37984b5 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicConfEntity.java
@@ -30,7 +30,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
* store the topic configure setting
*
*/
-public class TopicConfEntity extends BaseEntity {
+public class TopicConfEntity extends BaseEntity implements Cloneable {
private String recordKey = "";
private String topicName = "";
@@ -39,9 +39,9 @@ public class TopicConfEntity extends BaseEntity {
private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
private String brokerAddress = "";
// topic id, require globally unique
- private int topicId = TBaseConstants.META_VALUE_UNDEFINED;
+ private int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_UNDEFINED; //
topic status
- private TopicPropGroup topicProps = null;
+ private TopicPropGroup topicProps = new TopicPropGroup();
public TopicConfEntity() {
@@ -88,7 +88,7 @@ public class TopicConfEntity extends BaseEntity {
getAttributes(), getCreateUser(), getCreateDate(),
getModifyUser(), getModifyDate());
bdbEntity.setDataVerId(getDataVersionId());
- bdbEntity.setTopicId(topicId);
+ bdbEntity.setTopicId(topicNameId);
bdbEntity.setNumTopicStores(topicProps.getNumTopicStores());
bdbEntity.setMemCacheMsgSizeInMB(topicProps.getMemCacheMsgSizeInMB());
bdbEntity.setMemCacheMsgCntInK(topicProps.getMemCacheMsgCntInK());
@@ -103,7 +103,7 @@ public class TopicConfEntity extends BaseEntity {
this.brokerIp = brokerIp;
this.brokerPort = brokerPort;
this.topicName = topicName;
- this.topicId = topicId;
+ this.topicNameId = topicId;
this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId,
topicName);
this.brokerAddress = KeyBuilderUtils.buildAddressInfo(brokerIp,
brokerPort);
}
@@ -137,11 +137,11 @@ public class TopicConfEntity extends BaseEntity {
}
public int getTopicId() {
- return topicId;
+ return topicNameId;
}
public void setTopicId(int topicId) {
- this.topicId = topicId;
+ this.topicNameId = topicId;
}
public String getTopicName() {
@@ -169,6 +169,14 @@ public class TopicConfEntity extends BaseEntity {
return this.deployStatus == TopicStatus.STATUS_TOPIC_OK;
}
+ public TopicStatus getDeployStatus() {
+ return deployStatus;
+ }
+
+ public void setDeployStatus(TopicStatus deployStatus) {
+ this.deployStatus = deployStatus;
+ }
+
/**
* Check whether the specified query item value matches
* Allowed query items:
@@ -185,7 +193,7 @@ public class TopicConfEntity extends BaseEntity {
if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED
&& target.getBrokerId() != this.brokerId)
|| (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED
- && target.getTopicId() != this.topicId)
+ && target.getTopicId() != this.topicNameId)
|| (TStringUtils.isNotBlank(target.getTopicName())
&& !target.getTopicName().equals(this.topicName))
|| (TStringUtils.isNotBlank(target.getBrokerIp())
@@ -197,6 +205,38 @@ public class TopicConfEntity extends BaseEntity {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ if (isLongName) {
+ sBuilder.append("{\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"topicName\":\"").append(topicName).append("\"")
+ .append(",\"brokerId\":").append(brokerId)
+ .append(",\"topicNameId\":").append(topicNameId)
+ .append(",\"brokerIp\":\"").append(brokerIp).append("\"")
+ .append(",\"brokerPort\":").append(brokerPort)
+
.append(",\"topicStatusId\":").append(deployStatus.getCode());
+ } else {
+ sBuilder.append("{\"recKey\":\"").append(recordKey).append("\"")
+ .append(",\"topic\":\"").append(topicName).append("\"")
+ .append(",\"brkId\":").append(brokerId)
+ .append(",\"topicId\":").append(topicNameId)
+ .append(",\"bIp\":\"").append(brokerIp).append("\"")
+ .append(",\"bPort\":").append(brokerPort)
+ .append(",\"tStsId\":").append(deployStatus.getCode());
+ }
+ topicProps.toWebJsonStr(sBuilder, isLongName);
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -211,7 +251,7 @@ public class TopicConfEntity extends BaseEntity {
TopicConfEntity that = (TopicConfEntity) o;
return brokerId == that.brokerId &&
brokerPort == that.brokerPort &&
- topicId == that.topicId &&
+ topicNameId == that.topicNameId &&
recordKey.equals(that.recordKey) &&
topicName.equals(that.topicName) &&
Objects.equals(brokerIp, that.brokerIp) &&
@@ -223,6 +263,20 @@ public class TopicConfEntity extends BaseEntity {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), recordKey, topicName, brokerId,
- brokerIp, brokerPort, brokerAddress, topicId, deployStatus,
topicProps);
+ brokerIp, brokerPort, brokerAddress, topicNameId,
deployStatus, topicProps);
+ }
+
+ @Override
+ public TopicConfEntity clone() {
+ try {
+ TopicConfEntity copy = (TopicConfEntity) super.clone();
+ if (copy.getTopicProps() != null) {
+ copy.setTopicProps(getTopicProps().clone());
+ }
+ copy.setDeployStatus(getDeployStatus());
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
index 432cb34..7a21d7c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java
@@ -29,9 +29,11 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEn
* store the topic authenticate control setting
*
*/
-public class TopicCtrlEntity extends BaseEntity {
+public class TopicCtrlEntity extends BaseEntity implements Cloneable {
private String topicName = "";
+ // topic id, require globally unique
+ private int topicNameId = TBaseConstants.META_VALUE_UNDEFINED;
private EnableStatus authCtrlStatus = EnableStatus.STATUS_UNDEFINE;
private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
@@ -40,17 +42,20 @@ public class TopicCtrlEntity extends BaseEntity {
super();
}
- public TopicCtrlEntity(String topicName, String createUser) {
+ public TopicCtrlEntity(String topicName, int topicNameId, String
createUser) {
super(createUser, new Date());
this.topicName = topicName;
+ this.topicNameId = topicNameId;
this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
}
- public TopicCtrlEntity(String topicName, boolean enableAuth, int
maxMsgSizeInB,
+ public TopicCtrlEntity(String topicName, int topicNameId,
+ boolean enableAuth, int maxMsgSizeInB,
long dataVersionId, String createUser,
Date createDate, String modifyUser, Date
modifyDate) {
super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
this.topicName = topicName;
+ this.topicNameId = topicNameId;
this.maxMsgSizeInB = maxMsgSizeInB;
if (enableAuth) {
this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
@@ -63,6 +68,7 @@ public class TopicCtrlEntity extends BaseEntity {
super(bdbEntity.getDataVerId(),
bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
this.topicName = bdbEntity.getTopicName();
+ this.topicNameId = bdbEntity.getTopicId();
this.maxMsgSizeInB = bdbEntity.getMaxMsgSize();
if (bdbEntity.isEnableAuthControl()) {
this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
@@ -76,6 +82,7 @@ public class TopicCtrlEntity extends BaseEntity {
BdbTopicAuthControlEntity bdbEntity =
new BdbTopicAuthControlEntity(topicName, isAuthCtrlEnable(),
getAttributes(), getCreateUser(), getCreateDate());
+ bdbEntity.setTopicId(topicNameId);
bdbEntity.setDataVerId(getDataVersionId());
bdbEntity.setMaxMsgSize(maxMsgSizeInB);
return bdbEntity;
@@ -93,7 +100,7 @@ public class TopicCtrlEntity extends BaseEntity {
return authCtrlStatus == EnableStatus.STATUS_ENABLE;
}
- public void setAuthCtrlStatus(boolean enableAuth) {
+ public void setEnableAuthCtrl(boolean enableAuth) {
if (enableAuth) {
this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
} else {
@@ -105,6 +112,10 @@ public class TopicCtrlEntity extends BaseEntity {
return authCtrlStatus;
}
+ public void setAuthCtrlStatus(EnableStatus authCtrlStatus) {
+ this.authCtrlStatus = authCtrlStatus;
+ }
+
public int getMaxMsgSizeInB() {
return maxMsgSizeInB;
}
@@ -113,6 +124,14 @@ public class TopicCtrlEntity extends BaseEntity {
this.maxMsgSizeInB = maxMsgSizeInB;
}
+ public int getTopicId() {
+ return topicNameId;
+ }
+
+ public void setTopicId(int topicNameId) {
+ this.topicNameId = topicNameId;
+ }
+
/**
* Check whether the specified query item value matches
* Allowed query items:
@@ -131,12 +150,43 @@ public class TopicCtrlEntity extends BaseEntity {
|| (TStringUtils.isNotBlank(target.getTopicName())
&& !target.getTopicName().equals(this.topicName))
|| (target.getAuthCtrlStatus() != EnableStatus.STATUS_UNDEFINE
- && target.getAuthCtrlStatus() != this.authCtrlStatus)) {
+ && target.getAuthCtrlStatus() != this.authCtrlStatus)
+ || (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED
+ && target.getTopicId() != this.topicNameId)) {
return false;
}
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder build container
+ * @param isLongName if return field key is long name
+ * @return
+ */
+ @Override
+ public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean
isLongName) {
+ int tmpMsgSizeInMB = maxMsgSizeInB;
+ if (maxMsgSizeInB != TBaseConstants.META_VALUE_UNDEFINED) {
+ tmpMsgSizeInMB /= TBaseConstants.META_MB_UNIT_SIZE;
+ }
+ if (isLongName) {
+ sBuilder.append("{\"topicName\":\"").append(topicName).append("\"")
+ .append(",\"topicNameId\":").append(topicNameId)
+
.append(",\"enableAuthControl\":").append(authCtrlStatus.isEnable())
+ .append(",\"maxMsgSizeInMB\":").append(tmpMsgSizeInMB);
+ } else {
+ sBuilder.append("{\"topic\":\"").append(topicName).append("\"")
+ .append(",\"topicId\":").append(topicNameId)
+ .append(",\"acEn\":").append(authCtrlStatus.isEnable())
+ .append(",\"mxMsgInMB\":").append(tmpMsgSizeInMB);
+ }
+ super.toWebJsonStr(sBuilder, isLongName);
+ sBuilder.append("}");
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -149,13 +199,26 @@ public class TopicCtrlEntity extends BaseEntity {
return false;
}
TopicCtrlEntity that = (TopicCtrlEntity) o;
- return maxMsgSizeInB == that.maxMsgSizeInB &&
+ return topicNameId == that.topicNameId &&
+ maxMsgSizeInB == that.maxMsgSizeInB &&
topicName.equals(that.topicName) &&
authCtrlStatus == that.authCtrlStatus;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), topicName, authCtrlStatus,
maxMsgSizeInB);
+ return Objects.hash(super.hashCode(), topicName,
+ topicNameId, authCtrlStatus, maxMsgSizeInB);
+ }
+
+ @Override
+ public TopicCtrlEntity clone() {
+ try {
+ TopicCtrlEntity copy = (TopicCtrlEntity) super.clone();
+ copy.setAuthCtrlStatus(getAuthCtrlStatus());
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
index 71e652d..09bac47 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
@@ -19,6 +19,7 @@ package
org.apache.tubemq.server.master.metamanage.metastore.dao.entity;
import java.io.Serializable;
import java.util.Objects;
+import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
@@ -27,7 +28,7 @@ import org.apache.tubemq.corebase.utils.TStringUtils;
* Topic property group, save topic related storage and configuration
information.
*
*/
-public class TopicPropGroup implements Serializable {
+public class TopicPropGroup implements Serializable, Cloneable {
private int numTopicStores = TBaseConstants.META_VALUE_UNDEFINED;
//store num
private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
//partition num
@@ -37,8 +38,8 @@ public class TopicPropGroup implements Serializable {
private int memCacheMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED; //
cache block size
private int memCacheMsgCntInK = TBaseConstants.META_VALUE_UNDEFINED; //
cache max count
private int memCacheFlushIntvl = TBaseConstants.META_VALUE_UNDEFINED; //
cache max interval
- private boolean acceptPublish = true; //enable publish
- private boolean acceptSubscribe = true; //enable subscribe
+ private Boolean acceptPublish = null; //enable publish
+ private Boolean acceptSubscribe = null; //enable subscribe
private String deletePolicy = ""; // delete policy
private int dataStoreType = TBaseConstants.META_VALUE_UNDEFINED; // type
private String dataPath = ""; //data path
@@ -136,7 +137,10 @@ public class TopicPropGroup implements Serializable {
return acceptPublish;
}
- public void setAcceptPublish(boolean acceptPublish) {
+ public Boolean getAcceptPublish() {
+ return acceptPublish;
+ }
+ public void setAcceptPublish(Boolean acceptPublish) {
this.acceptPublish = acceptPublish;
}
@@ -144,10 +148,14 @@ public class TopicPropGroup implements Serializable {
return acceptSubscribe;
}
- public void setAcceptSubscribe(boolean acceptSubscribe) {
+ public void setAcceptSubscribe(Boolean acceptSubscribe) {
this.acceptSubscribe = acceptSubscribe;
}
+ public Boolean getAcceptSubscribe() {
+ return acceptSubscribe;
+ }
+
public String getDeletePolicy() {
return deletePolicy;
}
@@ -196,6 +204,10 @@ public class TopicPropGroup implements Serializable {
&& target.getMemCacheMsgCntInK() != this.memCacheMsgCntInK)
|| (target.getMemCacheFlushIntvl() !=
TBaseConstants.META_VALUE_UNDEFINED
&& target.getMemCacheFlushIntvl() != this.memCacheFlushIntvl)
+ || (target.getAcceptPublish() != null
+ && target.getAcceptPublish() != this.acceptPublish)
+ || (target.getAcceptSubscribe() != null
+ && target.getAcceptSubscribe() != this.acceptSubscribe)
|| (TStringUtils.isNotBlank(target.getDeletePolicy())
&& !target.getDeletePolicy().equals(this.deletePolicy))) {
return false;
@@ -203,6 +215,45 @@ public class TopicPropGroup implements Serializable {
return true;
}
+ /**
+ * Serialize field to json format
+ *
+ * @param sBuilder
+ * @return
+ */
+ StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) {
+ if (isLongName) {
+ sBuilder.append(",\"numTopicStores\":").append(numTopicStores)
+ .append(",\"numPartitions\":").append(numPartitions)
+ .append(",\"unflushThreshold\":").append(unflushThreshold)
+ .append(",\"unflushInterval\":").append(unflushInterval)
+ .append(",\"unflushDataHold\":").append(unflushDataHold)
+
.append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB)
+
.append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK)
+
.append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl)
+ .append(",\"acceptPublish\":").append(acceptPublish)
+ .append(",\"acceptSubscribe\":").append(acceptSubscribe)
+
.append(",\"deletePolicy\":\"").append(deletePolicy).append("\"")
+ .append(",\"dataStoreType\":").append(dataStoreType)
+ .append(",\"dataPath\":\"").append(dataPath).append("\"");
+ } else {
+ sBuilder.append(",\"numStore\":").append(numTopicStores)
+ .append(",\"numPart\":").append(numPartitions)
+ .append(",\"unfDskMsgCnt\":").append(unflushThreshold)
+ .append(",\"unfDskInt\":").append(unflushInterval)
+ .append(",\"unfDskDataSz\":").append(unflushDataHold)
+ .append(",\"cacheInMB\":").append(memCacheMsgSizeInMB)
+ .append(",\"unfMemMsgCnt\":").append(memCacheMsgCntInK)
+ .append(",\"unfMemInt\":").append(memCacheFlushIntvl)
+ .append(",\"accPub\":").append(acceptPublish)
+ .append(",\"accSub\":").append(acceptSubscribe)
+ .append(",\"delPol\":\"").append(deletePolicy).append("\"")
+ .append(",\"dStType\":").append(dataStoreType)
+ .append(",\"dPath\":\"").append(dataPath).append("\"");
+ }
+ return sBuilder;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -220,17 +271,28 @@ public class TopicPropGroup implements Serializable {
memCacheMsgSizeInMB == that.memCacheMsgSizeInMB &&
memCacheMsgCntInK == that.memCacheMsgCntInK &&
memCacheFlushIntvl == that.memCacheFlushIntvl &&
- acceptPublish == that.acceptPublish &&
- acceptSubscribe == that.acceptSubscribe &&
dataStoreType == that.dataStoreType &&
+ Objects.equals(acceptPublish, that.acceptPublish) &&
+ Objects.equals(acceptSubscribe, that.acceptSubscribe) &&
Objects.equals(deletePolicy, that.deletePolicy) &&
Objects.equals(dataPath, that.dataPath);
}
@Override
public int hashCode() {
- return Objects.hash(numTopicStores, numPartitions, unflushThreshold,
unflushInterval,
- unflushDataHold, memCacheMsgSizeInMB, memCacheMsgCntInK,
memCacheFlushIntvl,
- acceptPublish, acceptSubscribe, deletePolicy, dataStoreType,
dataPath);
+ return Objects.hash(numTopicStores, numPartitions, unflushThreshold,
+ unflushInterval, unflushDataHold, memCacheMsgSizeInMB,
+ memCacheMsgCntInK, memCacheFlushIntvl, acceptPublish,
+ acceptSubscribe, deletePolicy, dataStoreType, dataPath);
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+ @Override
+ public TopicPropGroup clone() throws CloneNotSupportedException {
+ return (TopicPropGroup) super.clone();
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
index 63de11c..e38c5b4 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/AbstractWebHandler.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.server.master.web.handler;
import static
org.apache.tubemq.server.common.webbase.WebMethodMapper.registerWebMethod;
import org.apache.tubemq.server.master.TMaster;
+import org.apache.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
@@ -27,10 +28,12 @@ public abstract class AbstractWebHandler {
protected TMaster master;
protected BrokerConfManager brokerConfManager;
+ protected MetaDataManager metaDataManager;
public AbstractWebHandler(TMaster master) {
this.master = master;
this.brokerConfManager = this.master.getMasterTopicManager();
+ this.metaDataManager = this.master.getDefMetaDataManager();
}
public abstract void registerWebApiMethod();
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 0e58fbb..fff08f9 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -22,21 +22,24 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.SettingValidUtils;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.corebase.utils.Tuple3;
+import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fielddef.WebFieldDef;
import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
-import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import
org.apache.tubemq.server.master.metamanage.metastore.dao.entity.ClusterSettingEntity;
+import
org.apache.tubemq.server.master.metamanage.metastore.dao.entity.TopicPropGroup;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -81,9 +84,10 @@ public class WebMasterInfoHandler extends AbstractWebHandler
{
*/
public StringBuilder getGroupAddressStrInfo(HttpServletRequest req) {
StringBuilder strBuffer = new StringBuilder(512);
- ClusterGroupVO clusterGroupVO =
brokerConfManager.getGroupAddressStrInfo();
+ ClusterGroupVO clusterGroupVO =
metaDataManager.getGroupAddressStrInfo();
if (clusterGroupVO == null) {
-
strBuffer.append("{\"result\":false,\"errCode\":500,\"errMsg\":\"GetBrokerGroup
info error\",\"data\":[]}");
+ WebParameterUtils.buildFailResultWithBlankData(
+ 500, "GetBrokerGroup info error", strBuffer);
} else {
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"groupName\":\"")
.append(clusterGroupVO.getGroupName()).append("\",\"isPrimaryNodeActive\":")
@@ -146,13 +150,13 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
*/
public StringBuilder adminQueryClusterDefSetting(HttpServletRequest req) {
StringBuilder sBuilder = new StringBuilder(512);
- BdbClusterSettingEntity defClusterSetting =
- brokerConfManager.getBdbClusterSetting();
-
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"Ok\",\"data\":[");
+ ClusterSettingEntity defClusterSetting =
+ metaDataManager.getClusterDefSetting();
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuilder);
if (defClusterSetting != null) {
- defClusterSetting.toJsonString(sBuilder);
+ defClusterSetting.toWebJsonStr(sBuilder, true);
}
- sBuilder.append("]}");
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuilder, 1);
return sBuilder;
}
@@ -163,7 +167,6 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
* @return
*/
public StringBuilder adminSetClusterDefSetting(HttpServletRequest req) {
- boolean dataChanged = false;
ProcessResult result = new ProcessResult();
StringBuilder sBuilder = new StringBuilder(512);
// valid operation authorize info
@@ -172,16 +175,16 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- // check modify user field
- if (!WebParameterUtils.getStringParamValue(req,
- WebFieldDef.MODIFYUSER, true, null, result)) {
+ // check and get operation info
+ if (!WebParameterUtils.getAUDBaseInfo(req, false, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- String modifyUser = (String) result.retData1;
+ Tuple3<Long, String, Date> inOpTupleInfo =
+ (Tuple3<Long, String, Date>) result.getRetData();
// check max message size
if (!WebParameterUtils.getIntParamValue(req,
- WebFieldDef.MAXMSGSIZE, false,
+ WebFieldDef.MAXMSGSIZEINMB, false,
TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
@@ -189,41 +192,158 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- int maxMsgSizeInMB = (int) result.retData1;
- if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
- dataChanged = true;
+ int inMaxMsgSize = (int) result.getRetData();
+ // get broker port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
}
- // check and get modify date
- if (!WebParameterUtils.getDateParameter(req,
- WebFieldDef.MODIFYDATE, false, new Date(), result)) {
+ int inBrokerPort = (int) result.getRetData();
+ // get broker tls port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERTLSPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- Date modifyDate = (Date) result.retData1;
- if (!dataChanged) {
- WebParameterUtils.buildSuccessResult(sBuilder, "No data is
changed!");
+ int inBrokerTlsPort = (int) result.getRetData();
+ // get broker web port info
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERWEBPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- // add or modify cluster setting info
- BdbClusterSettingEntity defClusterSetting =
- brokerConfManager.getBdbClusterSetting();
- if (defClusterSetting == null) {
- defClusterSetting = new BdbClusterSettingEntity();
+ int inBrokerWebPort = (int) result.getRetData();
+ // get and valid TopicPropGroup info
+ if (!WebParameterUtils.getTopicPropInfo(req, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
}
- defClusterSetting.setModifyInfo(modifyUser, modifyDate);
- if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
- defClusterSetting.setMaxMsgSizeInB(
-
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB));
+ TopicPropGroup defTopicProps = (TopicPropGroup) result.getRetData();
+ // get and valid qryPriorityId info
+ if (!WebParameterUtils.getQryPriorityIdParameter(req,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 101, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
}
- try {
- brokerConfManager.confSetBdbClusterDefSetting(defClusterSetting);
- WebParameterUtils.buildSuccessResult(sBuilder);
- } catch (Exception e) {
- WebParameterUtils.buildFailResult(sBuilder, e.getMessage());
+ int inQryPriorityId = (int) result.retData1;
+ // get flowCtrlEnable info
+ if (!WebParameterUtils.getBooleanParamValue(req,
+ WebFieldDef.FLOWCTRLENABLE, false, null, result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ Boolean flowCtrlEnable = (Boolean) result.retData1;
+ // get and flow control rule info
+ int flowRuleCnt = WebParameterUtils.getAndCheckFlowRules(req, null,
result);
+ if (!result.success) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
}
+ String flowCtrlInfo = (String) result.retData1;
+
+ // add or modify record
+ ClusterSettingEntity newConf = null;
+ ClusterSettingEntity curConf = metaDataManager.getClusterDefSetting();
+ if (curConf == null) {
+ newConf = new ClusterSettingEntity(
+ inOpTupleInfo.getF0(), inOpTupleInfo.getF1(),
inOpTupleInfo.getF2());
+ // check and process max message size
+ if (inMaxMsgSize == TBaseConstants.META_VALUE_UNDEFINED) {
+ inMaxMsgSize = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
+ }
+ newConf.setMaxMsgSizeInB(
+
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(inMaxMsgSize));
+ // check and process broker ports
+ if (inBrokerPort == TBaseConstants.META_VALUE_UNDEFINED) {
+ inBrokerPort = TBaseConstants.META_DEFAULT_BROKER_PORT;
+ }
+ newConf.setBrokerPort(inBrokerPort);
+ if (inBrokerTlsPort == TBaseConstants.META_VALUE_UNDEFINED) {
+ inBrokerTlsPort = TBaseConstants.META_DEFAULT_BROKER_TLS_PORT;
+ }
+ newConf.setBrokerTLSPort(inBrokerTlsPort);
+ if (inBrokerWebPort == TBaseConstants.META_VALUE_UNDEFINED) {
+ inBrokerWebPort = TBaseConstants.META_DEFAULT_BROKER_WEB_PORT;
+ }
+ newConf.setBrokerWebPort(inBrokerWebPort);
+ if (inQryPriorityId == TBaseConstants.META_VALUE_UNDEFINED) {
+ inQryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE;
+ }
+ newConf.setQryPriorityId(inQryPriorityId);
+ newConf.setClsDefTopicProps(defTopicProps);
+ if (flowCtrlEnable == null) {
+ flowCtrlEnable = false;
+ }
+ if (flowCtrlInfo == null) {
+ flowCtrlInfo = TServerConstants.BLANK_FLOWCTRL_RULES;
+ }
+ newConf.setGloFlowCtrlInfo(flowCtrlEnable, flowRuleCnt,
flowCtrlInfo);
+ if (!metaDataManager.confAddClusterDefSetting(newConf, sBuilder,
result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ } else {
+ boolean dataChanged = false;
+ newConf = curConf.clone();
+ newConf.setModifyInfo(inOpTupleInfo.getF0(),
+ false, inOpTupleInfo.getF1(), inOpTupleInfo.getF2());
+ if (inMaxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) {
+ inMaxMsgSize =
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(inMaxMsgSize);
+ if (newConf.getMaxMsgSizeInB() != inMaxMsgSize) {
+ dataChanged = true;
+ newConf.setMaxMsgSizeInB(inMaxMsgSize);
+ }
+ }
+ if (inBrokerPort != TBaseConstants.META_VALUE_UNDEFINED
+ && newConf.getBrokerPort() != inBrokerPort) {
+ dataChanged = true;
+ newConf.setBrokerPort(inBrokerPort);
+ }
+ if (inBrokerTlsPort != TBaseConstants.META_VALUE_UNDEFINED
+ && newConf.getBrokerTLSPort() != inBrokerTlsPort) {
+ dataChanged = true;
+ newConf.setBrokerTLSPort(inBrokerTlsPort);
+ }
+ if (inBrokerWebPort != TBaseConstants.META_VALUE_UNDEFINED
+ && newConf.getBrokerWebPort() != inBrokerWebPort) {
+ dataChanged = true;
+ newConf.setBrokerWebPort(inBrokerWebPort);
+ }
+ // check and set qry priority id
+ if (inQryPriorityId != TBaseConstants.META_VALUE_UNDEFINED
+ && newConf.getQryPriorityId() != inQryPriorityId) {
+ dataChanged = true;
+ newConf.setQryPriorityId(inQryPriorityId);
+ }
+ // check and set flowCtrl info
+ if (flowCtrlEnable != null
+ && flowCtrlEnable !=
newConf.getGloFlowCtrlStatus().isEnable()) {
+ dataChanged = true;
+ newConf.setEnableFlowCtrl(flowCtrlEnable);
+ }
+ if (TStringUtils.isNotBlank(flowCtrlInfo)
+ && !flowCtrlInfo.equals(newConf.getGloFlowCtrlRuleInfo()))
{
+ dataChanged = true;
+
newConf.setGloFlowCtrlInfo(newConf.getGloFlowCtrlStatus().isEnable(),
+ flowRuleCnt, flowCtrlInfo);
+ }
+ // check if changed
+ if (!dataChanged) {
+ WebParameterUtils.buildSuccessResult(sBuilder, "No data is
changed!");
+ return sBuilder;
+ }
+ if (!metaDataManager.confModClusterDefSetting(newConf, sBuilder,
result)) {
+ WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
+ return sBuilder;
+ }
+ }
+ curConf = metaDataManager.getClusterDefSetting();
+ WebParameterUtils.buildSuccWithData(curConf.getDataVersionId(),
sBuilder);
return sBuilder;
}
+
/**
* Query cluster topic overall view
*