This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-469
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit f6ce2be685859d777e34982dafa2eab83b808f61
Author: gosonzhang <[email protected]>
AuthorDate: Fri Jan 15 17:38:25 2021 +0800

    [TUBEMQ-512] Add package length control based on Topic
---
 .../tubemq/server/broker/BrokerServiceServer.java  | 12 +++---
 .../broker/metadata/ClusterConfigHolder.java       | 25 ++++++++-----
 .../server/broker/metadata/TopicMetadata.java      | 43 +++++++++++++++++++++-
 .../server/broker/msgstore/MessageStore.java       | 14 ++++---
 .../tubemq/server/common/TServerConstants.java     |  1 +
 .../server/common/paramcheck/PBParameterUtils.java |  2 +-
 .../bdbstore/bdbentitys/BdbTopicConfEntity.java    | 18 +++++++++
 .../nodemanage/nodebroker/BrokerConfManager.java   |  6 +++
 .../nodebroker/BrokerSyncStatusInfo.java           | 15 ++++++++
 .../web/handler/WebBrokerTopicConfHandler.java     | 43 ++++++++++++++++++++--
 10 files changed, 153 insertions(+), 26 deletions(-)

diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index c9cfba4..a95fa09 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -55,8 +55,8 @@ import org.apache.tubemq.corerpc.RpcConstants;
 import org.apache.tubemq.corerpc.service.BrokerReadService;
 import org.apache.tubemq.corerpc.service.BrokerWriteService;
 import org.apache.tubemq.server.Server;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
+import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
@@ -621,7 +621,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String topicName = (String) result.retData1;
+        final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
+        final String topicName = topicMetadata.getTopic();
         String msgType = null;
         int msgTypeCode = -1;
         if (TStringUtils.isNotBlank(request.getMsgType())) {
@@ -635,10 +636,10 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg("data length is zero!");
             return builder.build();
         }
-        if (dataLength > ClusterConfigHolder.getMaxMsgSize()) {
+        if (dataLength > topicMetadata.getMaxMsgSize()) {
             builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
             builder.setErrMsg(strBuffer.append("data length over max length, 
allowed max length is ")
-                    .append(ClusterConfigHolder.getMaxMsgSize())
+                    .append(topicMetadata.getMaxMsgSize())
                     .append(", data length is 
").append(dataLength).toString());
             return builder.build();
         }
@@ -1137,7 +1138,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg(result.errInfo);
             return builder.build();
         }
-        final String topicName = (String) result.retData1;
+        final TopicMetadata topicMetadata = (TopicMetadata) result.retData1;
+        final String topicName = topicMetadata.getTopic();
         String partStr = getPartStr(groupName, topicName, partitionId);
         ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
         if (consumerNodeInfo == null) {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
index 1b370d5..7d89dba 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
 import org.apache.tubemq.corebase.utils.MixedUtils;
-
+import org.apache.tubemq.corebase.utils.Tuple2;
 
 
 public class ClusterConfigHolder {
@@ -46,14 +46,11 @@ public class ClusterConfigHolder {
         if (configId.get() != clusterConfig.getConfigId()) {
             configId.set(clusterConfig.getConfigId());
             if (clusterConfig.hasMaxMsgSize()) {
-                int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(),
-                        TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
-                        TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
-                        + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
-                if (tmpMaxSize != maxMsgSize.get()) {
-                    maxMsgSize.set(tmpMaxSize);
-                    minMemCacheSize.set(tmpMaxSize +
-                            (tmpMaxSize % 4 + 1) * 
TBaseConstants.META_MESSAGE_SIZE_ADJUST);
+                Tuple2<Integer, Integer> calcResult =
+                        calcMaxMsgSize(clusterConfig.getMaxMsgSize());
+                if (calcResult.getF0() != maxMsgSize.get()) {
+                    maxMsgSize.set(calcResult.getF0());
+                    minMemCacheSize.set(calcResult.getF1());
                 }
             }
         }
@@ -71,4 +68,14 @@ public class ClusterConfigHolder {
         return minMemCacheSize.get();
     }
 
+    public static Tuple2<Integer, Integer> calcMaxMsgSize(int maxMsgSize) {
+        int tmpMaxSize = MixedUtils.mid(maxMsgSize,
+                TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+                TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+                + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
+        int tmpMinMemCacheSize = tmpMaxSize +
+                (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST;
+        return new Tuple2<>(tmpMaxSize, tmpMinMemCacheSize);
+    }
+
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
index 800254b..6e47d63 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/TopicMetadata.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.server.common.TStatusConstants;
 
 
@@ -62,6 +63,10 @@ public class TopicMetadata {
     private int memCacheMsgCnt = 5 * 1024;
     // the max interval(milliseconds) that topic's memory cache will flush to 
disk.
     private int memCacheFlushIntvl = 20000;
+    // the allowed max message size
+    private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+    // the allowed min memory cache size
+    private int minMemCacheSize = TBaseConstants.META_VALUE_UNDEFINED;
 
     /***
      * Build TopicMetadata from brokerDefMetadata(default config) and 
topicMetaConfInfo(custom config).
@@ -141,6 +146,17 @@ public class TopicMetadata {
         } else {
             this.memCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
         }
+        this.maxMsgSize = ClusterConfigHolder.getMaxMsgSize();
+        this.minMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+        if (topicConfInfoArr.length > 14) {
+            if (TStringUtils.isNotBlank(topicConfInfoArr[14])) {
+                int maxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
+                Tuple2<Integer, Integer> calcResult =
+                        ClusterConfigHolder.calcMaxMsgSize(maxMsgSize);
+                this.maxMsgSize = calcResult.getF0();
+                this.minMemCacheSize = calcResult.getF1();
+            }
+        }
     }
 
     private TopicMetadata(String topic, int unflushThreshold,
@@ -149,7 +165,8 @@ public class TopicMetadata {
                           int numPartitions, boolean acceptPublish,
                           boolean acceptSubscribe, int statusId,
                           int numTopicStores, int memCacheMsgSize,
-                          int memCacheMsgCnt, int memCacheFlushIntvl) {
+                          int memCacheMsgCnt, int memCacheFlushIntvl,
+                          int maxMsgSize, int minMemCacheSize) {
         this.topic = topic;
         this.unflushThreshold = unflushThreshold;
         this.unflushInterval = unflushInterval;
@@ -165,6 +182,8 @@ public class TopicMetadata {
         this.memCacheMsgSize = memCacheMsgSize;
         this.memCacheMsgCnt = memCacheMsgCnt;
         this.memCacheFlushIntvl = memCacheFlushIntvl;
+        this.maxMsgSize = maxMsgSize;
+        this.minMemCacheSize = minMemCacheSize;
     }
 
     @Override
@@ -175,7 +194,8 @@ public class TopicMetadata {
                 this.numPartitions, this.acceptPublish,
                 this.acceptSubscribe, this.statusId,
                 this.numTopicStores, this.memCacheMsgSize,
-                this.memCacheMsgCnt, this.memCacheFlushIntvl);
+                this.memCacheMsgCnt, this.memCacheFlushIntvl,
+                this.maxMsgSize, this.minMemCacheSize);
     }
 
     public boolean isAcceptPublish() {
@@ -304,6 +324,14 @@ public class TopicMetadata {
         return memCacheFlushIntvl;
     }
 
+    public int getMaxMsgSize() {
+        return maxMsgSize;
+    }
+
+    public int getMinMemCacheSize() {
+        return minMemCacheSize;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -323,6 +351,8 @@ public class TopicMetadata {
         result = prime * result + this.memCacheMsgSize;
         result = prime * result + this.memCacheMsgCnt;
         result = prime * result + this.memCacheFlushIntvl;
+        result = prime * result + this.maxMsgSize;
+        result = prime * result + this.minMemCacheSize;
         return result;
     }
 
@@ -400,6 +430,12 @@ public class TopicMetadata {
         if (this.memCacheFlushIntvl != other.memCacheFlushIntvl) {
             return false;
         }
+        if (this.maxMsgSize != other.maxMsgSize) {
+            return false;
+        }
+        if (this.minMemCacheSize != other.minMemCacheSize) {
+            return false;
+        }
 
         return true;
     }
@@ -418,6 +454,7 @@ public class TopicMetadata {
                 && this.memCacheMsgSize == other.memCacheMsgSize
                 && this.memCacheMsgCnt == other.memCacheMsgCnt
                 && this.memCacheFlushIntvl == other.memCacheFlushIntvl
+                && this.maxMsgSize == other.maxMsgSize
                 && this.deletePolicy.equals(other.deletePolicy));
     }
 
@@ -438,6 +475,8 @@ public class TopicMetadata {
                 .append(", memCacheMsgSizeInMs=").append(this.memCacheMsgSize 
/ 1024 / 512)
                 .append(", memCacheMsgCntInK=").append(this.memCacheMsgCnt / 
512)
                 .append(", 
memCacheFlushIntvl=").append(this.memCacheFlushIntvl)
+                .append(", maxMsgSize=").append(this.maxMsgSize)
+                .append(", minMemCacheSize=").append(this.minMemCacheSize)
                 .append("]").toString();
     }
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 1987c32..a025f7c 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -131,7 +131,7 @@ public class MessageStore implements Closeable {
         this.unflushThreshold.set(topicMetadata.getUnflushThreshold());
         this.unflushDataHold.set(topicMetadata.getUnflushDataHold());
         this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
-        this.writeCacheMaxSize = 
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
+        this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
         this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
         int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
         memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000));
@@ -419,7 +419,7 @@ public class MessageStore implements Closeable {
         writeCacheMutex.readLock().lock();
         try {
             writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
-            writeCacheMaxSize = 
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
+            writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata);
             writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
         } finally {
             writeCacheMutex.readLock().unlock();
@@ -601,13 +601,15 @@ public class MessageStore implements Closeable {
         }
     }
 
-    private int validAndGetMemCacheSize(int memCacheSize) {
-        if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) {
+    private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
+        int memCacheSize = topicMetadata.getMemCacheMsgSize();
+        if (memCacheSize <= topicMetadata.getMinMemCacheSize()) {
             logger.info(new StringBuilder(512)
-                    .append("[Data Store] writeCacheMaxSize changed, from ")
+                    .append("[Data Store] ").append(getTopic())
+                    .append(" writeCacheMaxSize changed, from ")
                     .append(memCacheSize).append(" to ")
                     
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
-            memCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+            memCacheSize = topicMetadata.getMinMemCacheSize();
         }
         return memCacheSize;
     }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 5793364..4e2547a 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -26,6 +26,7 @@ public final class TServerConstants {
     public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
     public static final String TOKEN_DEFAULT_FLOW_CONTROL = 
"default_master_ctrl";
     public static final String TOKEN_DEFAULT_CLUSTER_SETTING = 
"default_cluster_config";
+    public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize";
 
     public static final String TOKEN_BLANK_FILTER_CONDITION = ",,";
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index 1c2d5ca..c27ad5b 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -608,7 +608,7 @@ public class PBParameterUtils {
             strBuffer.delete(0, strBuffer.length());
             return result.success;
         }
-        result.setSuccResult(tmpValue);
+        result.setSuccResult(topicMetadata);
         return result.success;
     }
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
index 3763ef9..de9f6ef 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
@@ -345,6 +345,23 @@ public class BdbTopicConfEntity implements Serializable {
                         String.valueOf(memCacheFlushIntvl));
     }
 
+    public int getMaxMsgSize() {
+        String atrVal =
+                TStringUtils.getAttrValFrmAttributes(this.attributes,
+                        TServerConstants.TOKEN_MAX_MSG_SIZE);
+        if (atrVal != null) {
+            return Integer.parseInt(atrVal);
+        }
+        return TBaseConstants.META_VALUE_UNDEFINED;
+    }
+
+    public void setMaxMsgSize(int maxMsgSize) {
+        this.attributes =
+                TStringUtils.setAttrValToAttributes(this.attributes,
+                        TServerConstants.TOKEN_MAX_MSG_SIZE,
+                        String.valueOf(maxMsgSize));
+    }
+
     public void appendAttributes(String attrKey, String attrVal) {
         this.attributes =
                 TStringUtils.setAttrValToAttributes(this.attributes, attrKey, 
attrVal);
@@ -375,6 +392,7 @@ public class BdbTopicConfEntity implements Serializable {
                 
.append(",\"memCacheMsgCntInK\":").append(getMemCacheMsgCntInK())
                 
.append(",\"memCacheMsgSizeInMB\":").append(getMemCacheMsgSizeInMB())
                 
.append(",\"memCacheFlushIntvl\":").append(getMemCacheFlushIntvl())
+                .append(",\"maxMsgSize\":").append(getMaxMsgSize())
                 .append(",\"dataPath\":\"").append(dataPath)
                 .append("\",\"createUser\":\"").append(createUser)
                 .append("\",\"createDate\":\"")
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 3eae7dc..56f1914 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -1205,6 +1205,12 @@ public class BrokerConfManager implements Server {
                 } else {
                     
sbuffer.append(TokenConstants.ATTR_SEP).append(topicEntity.getMemCacheFlushIntvl());
                 }
+                int maxMsgSize = topicEntity.getMaxMsgSize();
+                if (maxMsgSize == TBaseConstants.META_VALUE_UNDEFINED) {
+                    sbuffer.append(TokenConstants.ATTR_SEP).append(" ");
+                } else {
+                    sbuffer.append(TokenConstants.ATTR_SEP).append(maxMsgSize);
+                }
                 brokerTopicStrConfSet.add(sbuffer.toString());
                 sbuffer.delete(0, sbuffer.length());
             }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
index 176c39a..83e2794 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncStatusInfo.java
@@ -26,6 +26,8 @@ import org.apache.commons.codec.binary.StringUtils;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.utils.CheckSum;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -730,6 +732,8 @@ public class BrokerSyncStatusInfo {
             int tmpmemCacheMsgSizeInMB = memCacheMsgSizeInMB;
             int tmpmemCacheMsgCntInK = memCacheMsgCntInK;
             int tmpmemCacheFlushIntvl = memCacheFlushIntvl;
+            int tmpMaxMsgSize = ClusterConfigHolder.getMaxMsgSize();
+            int tmpMinMemCacheSize = ClusterConfigHolder.getMinMemCacheSize();
             if (!TStringUtils.isBlank(topicConfInfoArr[11])) {
                 tmpmemCacheMsgSizeInMB = 
Integer.parseInt(topicConfInfoArr[11]);
             }
@@ -739,9 +743,20 @@ public class BrokerSyncStatusInfo {
             if (!TStringUtils.isBlank(topicConfInfoArr[13])) {
                 tmpmemCacheFlushIntvl = Integer.parseInt(topicConfInfoArr[13]);
             }
+            if (topicConfInfoArr.length > 14) {
+                if (!TStringUtils.isNotBlank(topicConfInfoArr[14])) {
+                    tmpMaxMsgSize = Integer.parseInt(topicConfInfoArr[14]);
+                    Tuple2<Integer, Integer> calcResult =
+                            ClusterConfigHolder.calcMaxMsgSize(tmpMaxMsgSize);
+                    tmpMaxMsgSize = calcResult.getF0();
+                    tmpMinMemCacheSize = calcResult.getF1();
+                }
+            }
             
strBuffer.append(",\"memCacheMsgSizeInMB\":").append(tmpmemCacheMsgSizeInMB);
             
strBuffer.append(",\"memCacheMsgCntInK\":").append(tmpmemCacheMsgCntInK);
             
strBuffer.append(",\"memCacheFlushIntvl\":").append(tmpmemCacheFlushIntvl);
+            strBuffer.append(",\"maxMsgSize\":").append(tmpMaxMsgSize);
+            
strBuffer.append(",\"minMemCacheSize\":").append(tmpMinMemCacheSize);
             strBuffer.append(",\"topicStatusId\":").append(topicStatusId);
             strBuffer.append("}");
         }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 8bef5ab..9a4cf04 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -25,11 +25,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import javax.servlet.http.HttpServletRequest;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.cluster.BrokerInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
+import org.apache.tubemq.corebase.utils.SettingValidUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.TStatusConstants;
@@ -212,6 +214,11 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                         
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                 req.getParameter("memCacheFlushIntvl"),
                                 false, defmemCacheFlushIntvl, 4000);
+                int maxMsgSizeInMB =
+                        
WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+                                req.getParameter("maxMsgSizeInMB"),
+                                false, 
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                                
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
                 String attributes = 
strBuffer.append(TokenConstants.TOKEN_STORE_NUM)
                         .append(TokenConstants.EQ).append(numTopicStores)
                         
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
@@ -221,7 +228,11 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                         
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
                         .append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
                         
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
-                        
.append(TokenConstants.EQ).append(memCacheFlushIntvl).toString();
+                        .append(TokenConstants.EQ).append(memCacheFlushIntvl)
+                        
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+                        .append(TokenConstants.EQ)
+                        
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
+                        .toString();
                 strBuffer.delete(0, strBuffer.length());
                 for (String itemTopicName : batchAddTopicNames) {
                     batchAddBdbTopicEntities.add(new 
BdbTopicConfEntity(oldEntity.getBrokerId(),
@@ -359,6 +370,11 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                             
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                                     jsonObject.get("memCacheFlushIntvl"),
                                     false, 
brokerConfEntity.getDftMemCacheFlushIntvl(), 4000);
+                    int maxMsgSizeInMB =
+                            
WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+                                    jsonObject.get("maxMsgSizeInMB"),
+                                    false, 
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                                    
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
                     String itemCreateUser =
                             
WebParameterUtils.validStringParameter("createUser",
                                     jsonObject.get("createUser"),
@@ -381,7 +397,11 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                                     
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
                                     
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
                                     
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
-                                    
.append(TokenConstants.EQ).append(memCacheFlushIntvl).toString();
+                                    
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
+                                    
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+                                    .append(TokenConstants.EQ)
+                                    
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
+                                    .toString();
                     strBuffer.delete(0, strBuffer.length());
                     batchAddItemKeys.add(inputKey);
                     batchAddBdbTopicEntities.add(new 
BdbTopicConfEntity(brokerConfEntity.getBrokerId(),
@@ -595,6 +615,8 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                             
.append(",\"memCacheMsgSizeInMB\":").append(entity.getMemCacheMsgSizeInMB())
                             
.append(",\"memCacheFlushIntvl\":").append(entity.getMemCacheFlushIntvl())
                             
.append(",\"memCacheMsgCntInK\":").append(entity.getMemCacheMsgCntInK())
+                            .append(",\"maxMsgSizeInMB\":")
+                            .append(entity.getMaxMsgSize() / 
TBaseConstants.META_MB_UNIT_SIZE)
                             
.append(",\"createUser\":\"").append(entity.getCreateUser())
                             
.append("\",\"createDate\":\"").append(formatter.format(entity.getCreateDate()))
                             
.append("\",\"modifyUser\":\"").append(entity.getModifyUser())
@@ -1334,13 +1356,19 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
             int memCacheMsgSizeInMB =
                     
WebParameterUtils.validIntDataParameter("memCacheMsgSizeInMB",
                             req.getParameter("memCacheMsgSizeInMB"), false, 
TBaseConstants.META_VALUE_UNDEFINED, 2);
-            memCacheMsgSizeInMB = memCacheMsgSizeInMB >= 2048 ? 2048 : 
memCacheMsgSizeInMB;
+            memCacheMsgSizeInMB = Math.min(memCacheMsgSizeInMB, 2048);
             int memCacheFlushIntvl =
                     
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
                             req.getParameter("memCacheFlushIntvl"), false, 
TBaseConstants.META_VALUE_UNDEFINED, 4000);
             int unFlushDataHold =
                     WebParameterUtils.validIntDataParameter("unflushDataHold",
                             req.getParameter("unflushDataHold"), false, 
TBaseConstants.META_VALUE_UNDEFINED, 0);
+            int maxMsgSizeInMB =
+                    WebParameterUtils.validIntDataParameter("maxMsgSizeInMB",
+                            req.getParameter("maxMsgSizeInMB"),
+                            false, TBaseConstants.META_VALUE_UNDEFINED,
+                            TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
+
             List<BdbTopicConfEntity> batchModBdbTopicEntities = new 
ArrayList<>();
             for (BdbBrokerConfEntity tgtEntity : batchBrokerEntitySet) {
                 if (tgtEntity == null) {
@@ -1418,6 +1446,15 @@ public class WebBrokerTopicConfHandler extends 
AbstractWebHandler {
                         
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_MSG_SIZE,
                                 String.valueOf(memCacheMsgSizeInMB));
                     }
+                    if (maxMsgSizeInMB > 0) {
+                        int maxMsgSizeInB =
+                                
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
+                        if (maxMsgSizeInB != oldEntity.getMaxMsgSize()) {
+                            foundChange = true;
+                            
newEntity.appendAttributes(TServerConstants.TOKEN_MAX_MSG_SIZE,
+                                    String.valueOf(maxMsgSizeInB));
+                        }
+                    }
                     if (memCacheFlushIntvl >= 0 && memCacheFlushIntvl != 
oldEntity.getMemCacheFlushIntvl()) {
                         foundChange = true;
                         
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,

Reply via email to