This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new cfd3bf7 [TUBEMQ-512] Add package length control based on Topic
cfd3bf7 is described below
commit cfd3bf7816abfbe472b0c366410b6283d3f63673
Author: gosonzhang <[email protected]>
AuthorDate: Fri Jan 15 17:38:25 2021 +0800
[TUBEMQ-512] Add package length control based on Topic
---
.../tubemq/server/broker/BrokerServiceServer.java | 12 +++---
.../broker/metadata/ClusterConfigHolder.java | 25 ++++++++-----
.../server/broker/metadata/TopicMetadata.java | 43 +++++++++++++++++++++-
.../server/broker/msgstore/MessageStore.java | 14 ++++---
.../tubemq/server/common/TServerConstants.java | 1 +
.../server/common/paramcheck/PBParameterUtils.java | 2 +-
.../bdbstore/bdbentitys/BdbTopicConfEntity.java | 18 +++++++++
.../nodemanage/nodebroker/BrokerConfManager.java | 6 +++
.../nodebroker/BrokerSyncStatusInfo.java | 15 ++++++++
.../web/handler/WebBrokerTopicConfHandler.java | 43 ++++++++++++++++++++--
10 files changed, 153 insertions(+), 26 deletions(-)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index c9cfba4..a95fa09 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -55,8 +55,8 @@ import org.apache.tubemq.corerpc.RpcConstants;
import org.apache.tubemq.corerpc.service.BrokerReadService;
import org.apache.tubemq.corerpc.service.BrokerWriteService;
import org.apache.tubemq.server.Server;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
+import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
@@ -621,7 +621,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String topicName = (String) result.retData1;
+ final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
+ final String topicName = topicMetadata.getTopic();
String msgType = null;
int msgTypeCode = -1;
if (TStringUtils.isNotBlank(request.getMsgType())) {
@@ -635,10 +636,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg("data length is zero!");
return builder.build();
}
- if (dataLength > ClusterConfigHolder.getMaxMsgSize()) {
+ if (dataLength > topicMetadata.getMaxMsgSize()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg(strBuffer.append("data length over max length,
allowed max length is ")
- .append(ClusterConfigHolder.getMaxMsgSize())
+ .append(topicMetadata.getMaxMsgSize())
.append(", data length is
").append(dataLength).toString());
return builder.build();
}
@@ -1137,7 +1138,8 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg(result.errInfo);
return builder.build();
}
- final String topicName = (String) result.retData1;
+ final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
+ final String topicName = topicMetadata.getTopic();
String partStr = getPartStr(groupName, topicName, partitionId);
ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
if (consumerNodeInfo == null) {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
index 1b370d5..7d89dba 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.tubemq.corebase.utils.MixedUtils;
-
+import org.apache.tubemq.corebase.utils.Tuple2;
public class ClusterConfigHolder {
@@ -46,14 +46,11 @@ public class ClusterConfigHolder {
if (configId.get() != clusterConfig.getConfigId()) {
configId.set(clusterConfig.getConfigId());
if (clusterConfig.hasMaxMsgSize()) {
- int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(),
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
- + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
- if (tmpMaxSize != maxMsgSize.get()) {
- maxMsgSize.set(tmpMaxSize);
- minMemCacheSize.set(tmpMaxSize +
- (tmpMaxSize % 4 + 1) *
TBaseConstants.META_MESSAGE_SIZE_ADJUST);
+ Tuple2<Integer, Integer> calcResult =
+ calcMaxMsgSize(clusterConfig.getMaxMsgSize());
+ if (calcResult.getF0() != maxMsgSize.get()) {
+ maxMsgSize.set(calcResult.getF0());
+ minMemCacheSize.set(calcResult.getF1());
}
}
}
@@ -71,4 +68,14 @@ public class ClusterConfigHolder {
return minMemCacheSize.get();
}
+ public static Tuple2<Integer, Integer> calcMaxMsgSize(int maxMsgSize) {
+ int tmpMaxSize = MixedUtils.mid(maxMsgSize,
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+ + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
+ int tmpMinMemCacheSize = tmpMaxSize +
+ (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST;
+ return new Tuple2<>(tmpMaxSize, tmpMinMemCacheSize);
+ }
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index 800254b..6e47d63 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.TStatusConstants;
@@ -62,6 +63,10 @@ public class TopicMetadata {
private int memCacheMsgCnt = 5 * 1024;
// the max interval(milliseconds) that topic's memory cache will flush to
disk.
private int memCacheFlushIntvl = 20000;
+ // the allowed max message size
+ private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+ // the allowed min memory cache size
+ private int minMemCacheSize = TBaseConstants.META_VALUE_UNDEFINED;
/***
* Build TopicMetadata from brokerDefMetadata(default config) and
topicMetaConfInfo(custom config).
@@ -141,6 +146,17 @@ public class TopicMetadata {
} else {
this.memCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
}
+ this.maxMsgSize = ClusterConfigHolder.getMaxMsgSize();
+ this.minMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+ if (topicConfInfoArr.length > 14) {
+ if (TStringUtils.isNotBlank(topicConfInfoArr[14])) {
+ int maxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
+ Tuple2<Integer, Integer> calcResult =
+ ClusterConfigHolder.calcMaxMsgSize(maxMsgSize);
+ this.maxMsgSize = calcResult.getF0();
+ this.minMemCacheSize = calcResult.getF1();
+ }
+ }
}
private TopicMetadata(String topic, int unflushThreshold,
@@ -149,7 +165,8 @@ public class TopicMetadata {
int numPartitions, boolean acceptPublish,
boolean acceptSubscribe, int statusId,
int numTopicStores, int memCacheMsgSize,
- int memCacheMsgCnt, int memCacheFlushIntvl) {
+ int memCacheMsgCnt, int memCacheFlushIntvl,
+ int maxMsgSize, int minMemCacheSize) {
this.topic = topic;
this.unflushThreshold = unflushThreshold;
this.unflushInterval = unflushInterval;
@@ -165,6 +182,8 @@ public class TopicMetadata {
this.memCacheMsgSize = memCacheMsgSize;
this.memCacheMsgCnt = memCacheMsgCnt;
this.memCacheFlushIntvl = memCacheFlushIntvl;
+ this.maxMsgSize = maxMsgSize;
+ this.minMemCacheSize = minMemCacheSize;
}
@Override
@@ -175,7 +194,8 @@ public class TopicMetadata {
this.numPartitions, this.acceptPublish,
this.acceptSubscribe, this.statusId,
this.numTopicStores, this.memCacheMsgSize,
- this.memCacheMsgCnt, this.memCacheFlushIntvl);
+ this.memCacheMsgCnt, this.memCacheFlushIntvl,
+ this.maxMsgSize, this.minMemCacheSize);
}
public boolean isAcceptPublish() {
@@ -304,6 +324,14 @@ public class TopicMetadata {
return memCacheFlushIntvl;
}
+ public int getMaxMsgSize() {
+ return maxMsgSize;
+ }
+
+ public int getMinMemCacheSize() {
+ return minMemCacheSize;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -323,6 +351,8 @@ public class TopicMetadata {
result = prime * result + this.memCacheMsgSize;
result = prime * result + this.memCacheMsgCnt;
result = prime * result + this.memCacheFlushIntvl;
+ result = prime * result + this.maxMsgSize;
+ result = prime * result + this.minMemCacheSize;
return result;
}
@@ -400,6 +430,12 @@ public class TopicMetadata {
if (this.memCacheFlushIntvl != other.memCacheFlushIntvl) {
return false;
}
+ if (this.maxMsgSize != other.maxMsgSize) {
+ return false;
+ }
+ if (this.minMemCacheSize != other.minMemCacheSize) {
+ return false;
+ }
return true;
}
@@ -418,6 +454,7 @@ public class TopicMetadata {
&& this.memCacheMsgSize == other.memCacheMsgSize
&& this.memCacheMsgCnt == other.memCacheMsgCnt
&& this.memCacheFlushIntvl == other.memCacheFlushIntvl
+ && this.maxMsgSize == other.maxMsgSize
&& this.deletePolicy.equals(other.deletePolicy));
}
@@ -438,6 +475,8 @@ public class TopicMetadata {
.append(", memCacheMsgSizeInMs=").append(this.memCacheMsgSize
/ 1024 / 512)
.append(", memCacheMsgCntInK=").append(this.memCacheMsgCnt /
512)
.append(",
memCacheFlushIntvl=").append(this.memCacheFlushIntvl)
+ .append(", maxMsgSize=").append(this.maxMsgSize)
+ .append(", minMemCacheSize=").append(this.minMemCacheSize)
.append("]").toString();
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 1987c32..a025f7c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -131,7 +131,7 @@ public class MessageStore implements Closeable {
this.unflushThreshold.set(topicMetadata.getUnflushThreshold());
this.unflushDataHold.set(topicMetadata.getUnflushDataHold());
this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
- this.writeCacheMaxSize =
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
+ this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000));
@@ -419,7 +419,7 @@ public class MessageStore implements Closeable {
writeCacheMutex.readLock().lock();
try {
writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
- writeCacheMaxSize =
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
+ writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
} finally {
writeCacheMutex.readLock().unlock();
@@ -601,13 +601,15 @@ public class MessageStore implements Closeable {
}
}
- private int validAndGetMemCacheSize(int memCacheSize) {
- if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) {
+ private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
+ int memCacheSize = topicMetadata.getMemCacheMsgSize();
+ if (memCacheSize <= topicMetadata.getMinMemCacheSize()) {
logger.info(new StringBuilder(512)
- .append("[Data Store] writeCacheMaxSize changed, from ")
+ .append("[Data Store] ").append(getTopic())
+ .append(" writeCacheMaxSize changed, from ")
.append(memCacheSize).append(" to ")
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
- memCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+ memCacheSize = topicMetadata.getMinMemCacheSize();
}
return memCacheSize;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 5793364..4e2547a 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -26,6 +26,7 @@ public final class TServerConstants {
public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
public static final String TOKEN_DEFAULT_FLOW_CONTROL =
"default_master_ctrl";
public static final String TOKEN_DEFAULT_CLUSTER_SETTING =
"default_cluster_config";
+ public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize";
public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 1c2d5ca..c27ad5b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -608,7 +608,7 @@ public class PBParameterUtils {
strBuffer.delete(0, strBuffer.length());
return result.success;
}
- result.setSuccResult(tmpValue);
+ result.setSuccResult(topicMetadata);
return result.success;
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
index 3763ef9..de9f6ef 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
@@ -345,6 +345,23 @@ public class BdbTopicConfEntity implements Serializable {
String.valueOf(memCacheFlushIntvl));
}
+ public int getMaxMsgSize() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TServerConstants.TOKEN_MAX_MSG_SIZE);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setMaxMsgSize(int maxMsgSize) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TServerConstants.TOKEN_MAX_MSG_SIZE,
+ String.valueOf(maxMsgSize));
+ }
+
public void appendAttributes(String attrKey, String attrVal) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes, attrKey,
attrVal);
@@ -375,6 +392,7 @@ public class BdbTopicConfEntity implements Serializable {
.append(",\"memCacheMsgCntInK\":").append(getMemCacheMsgCntInK())
.append(",\"memCacheMsgSizeInMB\":").append(getMemCacheMsgSizeInMB())
.append(",\"memCacheFlushIntvl\":").append(getMemCacheFlushIntvl())
+ .append(",\"maxMsgSize\":").append(getMaxMsgSize())
.append(",\"dataPath\":\"").append(dataPath)
.append("\",\"createUser\":\"").append(createUser)
.append("\",\"createDate\":\"")
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 3eae7dc..56f1914 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -1205,6 +1205,12 @@ public class BrokerConfManager implements Server {
} else {
sbuffer.append(TokenConstants.ATTR_SEP).append(topicEntity.getMemCacheFlushIntvl());
}
+ int maxMsgSize = topicEntity.getMaxMsgSize();
+ if (maxMsgSize == TBaseConstants.META_VALUE_UNDEFINED) {
+ sbuffer.append(TokenConstants.ATTR_SEP).append(" ");
+ } else {
+ sbuffer.append(TokenConstants.ATTR_SEP).append(maxMsgSize);
+ }
brokerTopicStrConfSet.add(sbuffer.toString());
sbuffer.delete(0, sbuffer.length());
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
index 176c39a..83e2794 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
@@ -26,6 +26,8 @@ import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.CheckSum;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -730,6 +732,8 @@ public class BrokerSyncStatusInfo {
int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB;
int tmpmemCacheMsgCntInK = memCacheMsgCntInK;
int tmpmemCacheFlushIntvl = memCacheFlushIntvl;
+ int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize();
+ int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
if (!TStringUtils.isBlank(topicConfInfoArr[11])) {
tmpmemCacheMsgSizeInMB =
Integer.parseInt(topicConfInfoArr[11]);
}
@@ -739,9 +743,20 @@ public class BrokerSyncStatusInfo {
if (!TStringUtils.isBlank(topicConfInfoArr[13])) {
tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
}
+ if (topicConfInfoArr.length > 14) {
+ if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) {
+ tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
+ Tuple2<Integer, Integer> calcResult =
+ ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize);
+ tmpMaxMsgSize = calcResult.getF0();
+ tmpMinMemCacheSize = calcResult.getF1();
+ }
+ }
strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB);
strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK);
strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl);
+ strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize);
+
strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize);
strBuffer.append(",\"topicStatusId\":").append(topicStatusId);
strBuffer.append("}");
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 8bef5ab..9a4cf04 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -25,11 +25,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.utils.SettingValidUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.TStatusConstants;
@@ -212,6 +214,11 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
req.getParameter("memCacheFlushIntvl"),
false, defmemCacheFlushIntvl, 4000);
+ int maxMsgSizeInMB =
+
WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+ req.getParameter("maxMsgSizeInMB"),
+ false,
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
String attributes =
strBuffer.append(TokenConstants.TOKEN_STORE_NUM)
.append(TokenConstants.EQ).append(numTopicStores)
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
@@ -221,7 +228,11 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
-
.append(TokenConstants.EQ).append(memCacheFlushIntvl).toString();
+ .append(TokenConstants.EQ).append(memCacheFlushIntvl)
+
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+ .append(TokenConstants.EQ)
+
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
+ .toString();
strBuffer.delete(0, strBuffer.length());
for (String itemTopicName : batchAddTopicNames) {
batchAddBdbTopicEntities.add(new
BdbTopicConfEntity(oldEntity.getBrokerId(),
@@ -359,6 +370,11 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
jsonObject.get("memCacheFlushIntvl"),
false,
brokerConfEntity.getDftMemCacheFlushIntvl(), 4000);
+ int maxMsgSizeInMB =
+
WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+ jsonObject.get("maxMsgSizeInMB"),
+ false,
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
String itemCreateUser =
WebParameterUtils.validStringParameter("createUser",
jsonObject.get("createUser"),
@@ -381,7 +397,11 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
-
.append(TokenConstants.EQ).append(memCacheFlushIntvl).toString();
+
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
+
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+ .append(TokenConstants.EQ)
+
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
+ .toString();
strBuffer.delete(0, strBuffer.length());
batchAddItemKeys.add(inputKey);
batchAddBdbTopicEntities.add(new
BdbTopicConfEntity(brokerConfEntity.getBrokerId(),
@@ -595,6 +615,8 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
.append(",\"memCacheMsgSizeInMB\":").append(entity.getMemCacheMsgSizeInMB())
.append(",\"memCacheFlushIntvl\":").append(entity.getMemCacheFlushIntvl())
.append(",\"memCacheMsgCntInK\":").append(entity.getMemCacheMsgCntInK())
+ .append(",\"maxMsgSizeInMB\":")
+ .append(entity.getMaxMsgSize() /
TBaseConstants.META_MB_UNIT_SIZE)
.append(",\"createUser\":\"").append(entity.getCreateUser())
.append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
.append("\",\"modifyUser\":\"").append(entity.getModifyUser())
@@ -1334,13 +1356,19 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
int memCacheMsgSizeInMB =
WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
req.getParameter("memCacheMsgSizeInMB"), false,
TBaseConstants.META_VALUE_UNDEFINED, 2);
- memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 :
memCacheMsgSizeInMB;
+ memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
int memCacheFlushIntvl =
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
req.getParameter("memCacheFlushIntvl"), false,
TBaseConstants.META_VALUE_UNDEFINED, 4000);
int unFlushDataHold =
WebParameterUtils.validIntDataParameter("unflushDataHold",
req.getParameter("unflushDataHold"), false,
TBaseConstants.META_VALUE_UNDEFINED, 0);
+ int maxMsgSizeInMB =
+ WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+ req.getParameter("maxMsgSizeInMB"),
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
+
List<BdbTopicConfEntity> batchModBdbTopicEntities = new
ArrayList<>();
for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) {
if (tgtEntity == null) {
@@ -1418,6 +1446,15 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_MSG_SIZE,
String.valueOf(memCacheMsgSizeInMB));
}
+ if (maxMsgSizeInMB > 0) {
+ int maxMsgSizeInB =
+
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
+ if (maxMsgSizeInB != oldEntity.getMaxMsgSize()) {
+ foundChange = true;
+
newEntity.appendAttributes(TServerConstants.TOKEN_MAX_MSG_SIZE,
+ String.valueOf(maxMsgSizeInB));
+ }
+ }
if (memCacheFlushIntvl >= 0 && memCacheFlushIntvl !=
oldEntity.getMemCacheFlushIntvl()) {
foundChange = true;
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,