This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 6ca161c8603d2cd98a5cb217ecb87f672682ef0d
Author: gosonzhang <[email protected]>
AuthorDate: Mon Jan 11 16:18:46 2021 +0800

    [TUBEMQ-501] Adjust max message size check logic
---
 .../tubemq/client/producer/AllowedSetting.java     |   4 +-
 .../tubemq/client/producer/ProducerManager.java    |  48 +++-
 .../client/producer/SimpleMessageProducer.java     |  25 +-
 .../org/apache/tubemq/corebase/TBaseConstants.java |  18 +-
 .../apache/tubemq/corebase/utils/MixedUtils.java   |  10 +
 .../tubemq/corebase/utils/SettingValidUtils.java   |  19 +-
 .../org/apache/tubemq/corerpc/RpcConstants.java    |   6 +-
 tubemq-core/src/main/proto/MasterService.proto     |  10 +-
 .../tubemq/server/broker/BrokerServiceServer.java  |   5 +-
 .../apache/tubemq/server/broker/TubeBroker.java    | 270 ++++++++++++---------
 .../broker/metadata/ClusterConfigHolder.java       |  82 +++++++
 .../server/broker/msgstore/MessageStore.java       |  16 +-
 .../server/broker/msgstore/disk/FileSegment.java   |   5 +-
 .../server/broker/msgstore/disk/MsgFileStore.java  |   3 +-
 .../server/broker/msgstore/mem/MsgMemStore.java    |   3 +-
 .../tubemq/server/common/fielddef/WebFieldDef.java |   4 +-
 .../server/common/paramcheck/PBParameterUtils.java |  33 +++
 .../server/common/utils/WebParameterUtils.java     | 109 +++++----
 .../org/apache/tubemq/server/master/TMaster.java   |  72 ++++++
 .../bdbentitys/BdbClusterSettingEntity.java        | 172 +++++++------
 .../master/web/handler/WebMasterInfoHandler.java   |  13 +-
 21 files changed, 650 insertions(+), 277 deletions(-)

diff --git 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
index cabf928..7beb4e0 100644
--- 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
+++ 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
@@ -45,8 +45,8 @@ public class AllowedSetting {
             }
             if (allowedConfig.hasMaxMsgSize()
                     && allowedConfig.getMaxMsgSize() != maxMsgSize.get()) {
-                maxMsgSize.set(
-                        
SettingValidUtils.validAndGetMaxMsgSize(allowedConfig.getMaxMsgSize()));
+                maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
+                        allowedConfig.getMaxMsgSize()));
             }
         }
     }
diff --git 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index ea1fd51..07586ec 100644
--- 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++ 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -79,6 +79,8 @@ public class ProducerManager {
     private final ScheduledExecutorService heartbeatService;
     private final AtomicLong visitToken =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private final AllowedSetting allowedSetting =
+            new AllowedSetting();
     private final AtomicReference<String> authAuthorizedTokenRef =
             new AtomicReference<>("");
     private final ClientAuthenticateHandler authenticateHandler =
@@ -311,6 +313,15 @@ public class ProducerManager {
     }
 
     /**
+     * Get allowed message size.
+     *
+     * @return max allowed message size
+     */
+    public int getMaxMsgSize() {
+        return allowedSetting.getMaxMsgSize();
+    }
+
+    /**
      * Check if the producer manager is shutdown.
      *
      * @return producer status
@@ -396,7 +407,7 @@ public class ProducerManager {
                         updateBrokerInfoList(true, 
response.getBrokerInfosList(),
                                 response.getBrokerCheckSum(), sBuilder);
                     }
-                    processRegAuthorizedToken(response);
+                    processRegSyncInfo(response);
                     return;
                 }
                 if (remainingRetry <= 0) {
@@ -436,6 +447,7 @@ public class ProducerManager {
         if (authInfoBuilder != null) {
             builder.setAuthInfo(authInfoBuilder.build());
         }
+        builder.setAppdConfig(buildAllowedConfig4P());
         return builder.build();
     }
 
@@ -455,6 +467,7 @@ public class ProducerManager {
         if (authInfoBuilder != null) {
             builder.setAuthInfo(authInfoBuilder.build());
         }
+        builder.setAppdConfig(buildAllowedConfig4P());
         return builder.build();
     }
 
@@ -544,13 +557,22 @@ public class ProducerManager {
         }
     }
 
-    private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2P 
response) {
+    private void processRegSyncInfo(ClientMaster.RegisterResponseM2P response) 
{
         if (response.hasAuthorizedInfo()) {
             processAuthorizedToken(response.getAuthorizedInfo());
         }
+        if (response.hasAppdConfig()) {
+            procAllowedConfig4P(response.getAppdConfig());
+        }
     }
 
-    private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2P 
response) {
+    private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P 
response) {
+        if (response.hasRequireAuth()) {
+            nextWithAuthInfo2M.set(response.getRequireAuth());
+        }
+        if (response.hasAppdConfig()) {
+            procAllowedConfig4P(response.getAppdConfig());
+        }
         if (response.hasAuthorizedInfo()) {
             processAuthorizedToken(response.getAuthorizedInfo());
         }
@@ -595,6 +617,21 @@ public class ProducerManager {
         return authInfoBuilder;
     }
 
+    // build allowed configure info
+    private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() {
+        ClientMaster.ApprovedClientConfig.Builder appdConfig =
+                ClientMaster.ApprovedClientConfig.newBuilder();
+        appdConfig.setConfigId(allowedSetting.getConfigId());
+        return appdConfig;
+    }
+
+    // set allowed configure info
+    private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig 
allowedConfig) {
+        if (allowedConfig != null) {
+            allowedSetting.updAllowedSetting(allowedConfig);
+        }
+    }
+
     // #lizard forgives
     private class ProducerHeartbeatTask implements Runnable {
         @Override
@@ -633,10 +670,7 @@ public class ProducerManager {
                     }
                     return;
                 }
-                if (response.hasRequireAuth()) {
-                    nextWithAuthInfo2M.set(response.getRequireAuth());
-                }
-                processHeartBeatAuthorizedToken(response);
+                processHeartBeatSyncInfo(response);
                 if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
                     lastHeartbeatTime = System.currentTimeMillis();
                     return;
diff --git 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
index 7d6894d..946f03e 100644
--- 
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
+++ 
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
@@ -288,19 +288,6 @@ public class SimpleMessageProducer implements 
MessageProducer {
                 || (message.getData().length == 0)) {
             throw new TubeClientException("Illegal parameter: null data in 
message package!");
         }
-        int msgSize = TStringUtils.isBlank(message.getAttribute())
-                ? message.getData().length : (message.getData().length + 
message.getAttribute().length());
-        if (msgSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE) {
-            throw new TubeClientException(new StringBuilder(512)
-                    .append("Illegal parameter: over max message length for 
the total size of")
-                    .append(" message data and attribute, allowed size is ")
-                    .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE)
-                    .append(", message's real size is 
").append(msgSize).toString());
-        }
-        if (isShutDown.get()) {
-            throw new TubeClientException("Status error: producer has been 
shutdown!");
-        }
-
         if (this.publishTopicMap.get(message.getTopic()) == null) {
             throw new TubeClientException(new StringBuilder(512)
                     .append("Topic ").append(message.getTopic())
@@ -311,6 +298,18 @@ public class SimpleMessageProducer implements 
MessageProducer {
                     .append("Topic ").append(message.getTopic())
                     .append(" not publish, make sure the topic exist or 
acceptPublish and try later!").toString());
         }
+        int msgSize = TStringUtils.isBlank(message.getAttribute())
+                ? message.getData().length : (message.getData().length + 
message.getAttribute().length());
+        if (msgSize > producerManager.getMaxMsgSize()) {
+            throw new TubeClientException(new StringBuilder(512)
+                    .append("Illegal parameter: over max message length for 
the total size of")
+                    .append(" message data and attribute, allowed size is ")
+                    .append(producerManager.getMaxMsgSize())
+                    .append(", message's real size is 
").append(msgSize).toString());
+        }
+        if (isShutDown.get()) {
+            throw new TubeClientException("Status error: producer has been 
shutdown!");
+        }
     }
 
     private ClientBroker.SendMessageRequestP2B 
createSendMessageRequest(Partition partition,
diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
index d91b083..5238323 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
@@ -29,9 +29,6 @@ public class TBaseConstants {
 
     public static final String META_DEFAULT_CHARSET_NAME = "UTF-8";
     public static final int META_MAX_MSGTYPE_LENGTH = 255;
-    public static final int META_MAX_MESSAGE_HEADER_SIZE = 1024;
-    public static final int META_MAX_MESSAGE_DATA_SIZE = 1024 * 1024;
-    public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT = 20 * 1024 
* 1024;
     public static final int META_MAX_PARTITION_COUNT = 100;
     public static final int META_MAX_BROKER_IP_LENGTH = 32;
     public static final int META_MAX_USERNAME_LENGTH = 64;
@@ -67,4 +64,19 @@ public class TBaseConstants {
 
     public static final long CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL = 20000;
 
+    public static final int META_MB_UNIT_SIZE = (1024 * 1024);
+    public static final int META_MESSAGE_SIZE_ADJUST = (512 * 1024);
+    public static final int META_MAX_MESSAGE_HEADER_SIZE = (10 * 1024);
+
+    public static final int META_MIN_ALLOWED_MESSAGE_SIZE_MB = 1;
+    public static final int META_MAX_ALLOWED_MESSAGE_SIZE_MB = 20;
+    public static final int META_MAX_MESSAGE_DATA_SIZE =
+            META_MIN_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+    public static final int META_MIN_MEM_BUFFER_SIZE =
+            META_MAX_MESSAGE_DATA_SIZE + META_MESSAGE_SIZE_ADJUST;
+    public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
+            META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+
+
+
 }
diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index bfbedad..1374b72 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -92,4 +92,14 @@ public class MixedUtils {
         dataBuffer.flip();
         return dataBuffer.array();
     }
+
+    // get the middle data between min, max, and data
+    public static int mid(int data, int min, int max) {
+        return Math.max(min, Math.min(max, data));
+    }
+
+    public static long mid(long data, long min, long max) {
+        return Math.max(min, Math.min(max, data));
+    }
+
 }
diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
index 4a206ef..e748ba4 100644
--- 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
+++ 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
@@ -22,18 +22,17 @@ import org.apache.tubemq.corebase.TBaseConstants;
 
 public class SettingValidUtils {
 
-    // get the middle data between min, max, and data
-    public static int mid(int data, int min, int max) {
-        return Math.max(min, Math.min(max, data));
-    }
 
-    public static long mid(long data, long min, long max) {
-        return Math.max(min, Math.min(max, data));
+    public static int validAndXfeMaxMsgSizeFromMBtoB(int inMaxMsgSizeInMB) {
+        return MixedUtils.mid(inMaxMsgSizeInMB,
+                TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB)
+                * TBaseConstants.META_MB_UNIT_SIZE;
     }
 
-    public static int validAndGetMaxMsgSize(int inMaxMsgSize) {
-        return mid(inMaxMsgSize,
-            TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
-            TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
+    public static int validAndGetMaxMsgSizeInB(int inMaxMsgSizeInB) {
+        return MixedUtils.mid(inMaxMsgSizeInB,
+                TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+                TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
     }
 }
diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java 
b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
index e99ffc3..3a150b3 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
@@ -18,6 +18,9 @@
 package org.apache.tubemq.corerpc;
 
 
+import org.apache.tubemq.corebase.TBaseConstants;
+
+
 public final class RpcConstants {
 
     public static final String RPC_CODEC = "rpc.codec";
@@ -66,7 +69,8 @@ public final class RpcConstants {
     public static final int RPC_PROTOCOL_BEGIN_TOKEN = 0xFF7FF4FE;
     public static final int RPC_MAX_BUFFER_SIZE = 8192;
     public static final int MAX_FRAME_MAX_LIST_SIZE =
-            (int) ((1024 * 1024 * 8) / RPC_MAX_BUFFER_SIZE);
+            (int) ((TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT
+                    + TBaseConstants.META_MB_UNIT_SIZE * 8) / 
RPC_MAX_BUFFER_SIZE);
 
     public static final int RPC_FLAG_MSG_TYPE_REQUEST = 0x0;
     public static final int RPC_FLAG_MSG_TYPE_RESPONSE = 0x1;
diff --git a/tubemq-core/src/main/proto/MasterService.proto 
b/tubemq-core/src/main/proto/MasterService.proto
index 27e5496..b97f3fc 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -66,7 +66,7 @@ message ApprovedClientConfig {
     optional int32 maxMsgSize = 2;
 }
 
-message ClusterDefConfig {
+message ClusterConfig {
     required int64 configId = 1;
     optional int32 maxMsgSize = 2;
 }
@@ -222,7 +222,7 @@ message RegisterRequestB2M {
     optional int32 qryPriorityId = 12;
     optional int32 tlsPort = 13;
     optional MasterCertificateInfo authInfo = 14;
-    optional ClusterDefConfig clsDefConfig = 15;
+    optional ClusterConfig clsConfig = 15;
 }
 
 message RegisterResponseM2B {
@@ -245,7 +245,7 @@ message RegisterResponseM2B {
     optional int32 qryPriorityId = 15;
     optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated  */
     optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17;
-    optional ClusterDefConfig clsDefConfig = 18;
+    optional ClusterConfig clsConfig = 18;
 }
 
 message HeartRequestB2M {
@@ -266,7 +266,7 @@ message HeartRequestB2M {
     optional int64 flowCheckId = 13;
     optional int32 qryPriorityId = 14;
     optional MasterCertificateInfo authInfo = 15;
-    optional ClusterDefConfig clsDefConfig = 16;
+    optional ClusterConfig clsConfig = 16;
 }
 
 message HeartResponseM2B {
@@ -292,7 +292,7 @@ message HeartResponseM2B {
     optional int32 qryPriorityId = 17;
     optional MasterAuthorizedInfo authorizedInfo = 18;   /* Deprecated  */
     optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19;
-    optional ClusterDefConfig clsDefConfig = 20;
+    optional ClusterConfig clsConfig = 20;
 }
 
 message CloseRequestB2M {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index f9b242f..60490c6 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -55,6 +55,7 @@ import org.apache.tubemq.corerpc.RpcConstants;
 import org.apache.tubemq.corerpc.service.BrokerReadService;
 import org.apache.tubemq.corerpc.service.BrokerWriteService;
 import org.apache.tubemq.server.Server;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
@@ -633,10 +634,10 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
             builder.setErrMsg("data length is zero!");
             return builder.build();
         }
-        if (dataLength > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024) {
+        if (dataLength > ClusterConfigHolder.getMaxMsgSize()) {
             builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
             builder.setErrMsg(strBuffer.append("data length over max length, 
allowed max length is ")
-                    .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024)
+                    .append(ClusterConfigHolder.getMaxMsgSize())
                     .append(", data length is 
").append(dataLength).toString());
             return builder.build();
         }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
index 556cbba..690f0d7 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
@@ -46,6 +46,7 @@ import org.apache.tubemq.corerpc.service.MasterService;
 import org.apache.tubemq.server.Stoppable;
 import org.apache.tubemq.server.broker.exception.StartupException;
 import org.apache.tubemq.server.broker.metadata.BrokerMetadataManager;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
@@ -223,68 +224,8 @@ public class TubeBroker implements Stoppable {
                             }
                             isKeepAlive.set(true);
                             heartbeatErrors.set(0);
-                            FlowCtrlRuleHandler flowCtrlRuleHandler =
-                                metadataManager.getFlowCtrlRuleHandler();
-                            long flowCheckId = 
flowCtrlRuleHandler.getFlowCtrlId();
-                            int qryPriorityId = 
flowCtrlRuleHandler.getQryPriorityId();
-                            ServiceStatusHolder
-                                
.setReadWriteServiceStatus(response.getStopRead(),
-                                    response.getStopWrite(), "Master");
-                            if (response.hasFlowCheckId()) {
-                                qryPriorityId = response.hasQryPriorityId()
-                                    ? response.getQryPriorityId() : 
qryPriorityId;
-                                if (response.getFlowCheckId() != flowCheckId) {
-                                    flowCheckId = response.getFlowCheckId();
-                                    try {
-                                        flowCtrlRuleHandler
-                                            
.updateDefFlowCtrlInfo(qryPriorityId,
-                                                flowCheckId, 
response.getFlowControlInfo());
-                                    } catch (Exception e1) {
-                                        logger.warn(
-                                            "[HeartBeat response] found parse 
flowCtrl rules failure", e1);
-                                    }
-                                }
-                                if (qryPriorityId != 
flowCtrlRuleHandler.getQryPriorityId()) {
-                                    
flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
-                                }
-                            }
-                            requireReportConf = response.getNeedReportData();
                             StringBuilder sBuilder = new StringBuilder(512);
-                            if (response.getTakeConfInfo()) {
-                                logger.info(sBuilder
-                                    .append("[HeartBeat response] received 
broker metadata info: brokerConfId=")
-                                    .append(response.getCurBrokerConfId())
-                                    
.append(",stopWrite=").append(response.getStopWrite())
-                                    
.append(",stopRead=").append(response.getStopRead())
-                                    
.append(",configCheckSumId=").append(response.getConfCheckSumId())
-                                    
.append(",hasFlowCtrl=").append(response.hasFlowCheckId())
-                                    
.append(",curFlowCtrlId=").append(flowCheckId)
-                                    
.append(",curQryPriorityId=").append(qryPriorityId)
-                                    .append(",brokerDefaultConfInfo=")
-                                    
.append(response.getBrokerDefaultConfInfo())
-                                    .append(",brokerTopicSetConfList=")
-                                    
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
-                                sBuilder.delete(0, sBuilder.length());
-                                metadataManager
-                                    
.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
-                                        response.getConfCheckSumId(), 
response.getBrokerDefaultConfInfo(),
-                                        
response.getBrokerTopicSetConfInfoList(), false, sBuilder);
-                            }
-                            if (response.hasBrokerAuthorizedInfo()) {
-                                
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
-                            }
-                            boolean needProcess =
-                                metadataManager.updateBrokerRemoveTopicMap(
-                                    response.getTakeRemoveTopicInfo(),
-                                    response.getRemoveTopicConfInfoList(), 
sBuilder);
-                            if (needProcess) {
-                                new Thread() {
-                                    @Override
-                                    public void run() {
-                                        storeManager.removeTopicStore();
-                                    }
-                                }.start();
-                            }
+                            procConfigFromHeartBeat(sBuilder, response);
                         } catch (Throwable t) {
                             isKeepAlive.set(false);
                             heartbeatErrors.incrementAndGet();
@@ -355,6 +296,92 @@ public class TubeBroker implements Stoppable {
                 .append(TubeServerVersion.BROKER_VERSION).toString();
     }
 
+    private void procConfigFromHeartBeat(StringBuilder sBuilder,
+                                         HeartResponseM2B response) {
+        // process service status
+        ServiceStatusHolder
+                .setReadWriteServiceStatus(response.getStopRead(),
+                        response.getStopWrite(), "Master");
+        // process flow controller rules
+        FlowCtrlRuleHandler flowCtrlRuleHandler =
+                metadataManager.getFlowCtrlRuleHandler();
+        long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
+        int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
+        if (response.hasFlowCheckId()) {
+            qryPriorityId = response.hasQryPriorityId()
+                    ? response.getQryPriorityId() : qryPriorityId;
+            if (response.getFlowCheckId() != flowCheckId) {
+                flowCheckId = response.getFlowCheckId();
+                try {
+                    flowCtrlRuleHandler
+                            .updateDefFlowCtrlInfo(qryPriorityId,
+                                    flowCheckId, 
response.getFlowControlInfo());
+                } catch (Exception e1) {
+                    logger.warn(
+                            "[HeartBeat response] found parse flowCtrl rules 
failure", e1);
+                }
+            }
+            if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
+                flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+            }
+        }
+        // update configure report requirement
+        requireReportConf = response.getNeedReportData();
+        // update cluster setting
+        if (response.hasClsConfig()) {
+            long configId = response.getClsConfig().getConfigId();
+            if (configId != ClusterConfigHolder.getConfigId()) {
+                ClusterConfigHolder.updClusterSetting(response.getClsConfig());
+                logger.info(sBuilder
+                        .append("[HeartBeat response] received cluster 
configure changed,")
+                        
.append(",hasClsConfig=").append(response.hasClsConfig())
+                        
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
+                        
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
+                        .append(",minMemCacheSize=")
+                        .append(ClusterConfigHolder.getMinMemCacheSize())
+                        .toString());
+                sBuilder.delete(0, sBuilder.length());
+            }
+        }
+        if (response.getTakeConfInfo()) {
+            logger.info(sBuilder
+                    .append("[HeartBeat response] received broker metadata 
info: brokerConfId=")
+                    .append(response.getCurBrokerConfId())
+                    .append(",stopWrite=").append(response.getStopWrite())
+                    .append(",stopRead=").append(response.getStopRead())
+                    
.append(",configCheckSumId=").append(response.getConfCheckSumId())
+                    .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
+                    .append(",curFlowCtrlId=").append(flowCheckId)
+                    .append(",curQryPriorityId=").append(qryPriorityId)
+                    .append(",brokerDefaultConfInfo=")
+                    .append(response.getBrokerDefaultConfInfo())
+                    .append(",brokerTopicSetConfList=")
+                    
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
+            sBuilder.delete(0, sBuilder.length());
+            metadataManager
+                    .updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
+                            response.getConfCheckSumId(), 
response.getBrokerDefaultConfInfo(),
+                            response.getBrokerTopicSetConfInfoList(), false, 
sBuilder);
+        }
+        // update auth info
+        if (response.hasBrokerAuthorizedInfo()) {
+            
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
+        }
+        // process topic deletion
+        boolean needProcess =
+                metadataManager.updateBrokerRemoveTopicMap(
+                        response.getTakeRemoveTopicInfo(),
+                        response.getRemoveTopicConfInfoList(), sBuilder);
+        if (needProcess) {
+            new Thread() {
+                @Override
+                public void run() {
+                    storeManager.removeTopicStore();
+                }
+            }.start();
+        }
+    }
+
     /***
      * Register to master. Try multi times if failed.
      *
@@ -374,53 +401,7 @@ public class TubeBroker implements Stoppable {
                             .append("Register to master failed! The error 
message is ")
                             .append(response.getErrMsg()).toString());
                 }
-                ServiceStatusHolder
-                        .setReadWriteServiceStatus(response.getStopRead(),
-                                response.getStopWrite(), "Master");
-                FlowCtrlRuleHandler flowCtrlRuleHandler =
-                        metadataManager.getFlowCtrlRuleHandler();
-                if (response.hasFlowCheckId()) {
-                    int qryPriorityId = response.hasQryPriorityId()
-                            ? response.getQryPriorityId() : 
flowCtrlRuleHandler.getQryPriorityId();
-                    if (response.getFlowCheckId() != 
flowCtrlRuleHandler.getFlowCtrlId()) {
-                        try {
-                            flowCtrlRuleHandler
-                                
.updateDefFlowCtrlInfo(response.getQryPriorityId(),
-                                    response.getFlowCheckId(), 
response.getFlowControlInfo());
-                        } catch (Exception e1) {
-                            logger.warn("[Register response] found parse 
flowCtrl rules failure", e1);
-                        }
-                    }
-                    if (qryPriorityId != 
flowCtrlRuleHandler.getQryPriorityId()) {
-                        flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
-                    }
-                }
-                updateEnableBrokerFunInfo(response);
-                logger.info(sBuilder
-                    .append("[Register response] received broker metadata 
info: brokerConfId=")
-                    .append(response.getCurBrokerConfId())
-                    .append(",stopWrite=").append(response.getStopWrite())
-                    .append(",stopRead=").append(response.getStopRead())
-                    
.append(",configCheckSumId=").append(response.getConfCheckSumId())
-                    .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
-                    .append(",enableVisitTokenCheck=")
-                    .append(serverAuthHandler.isEnableVisitTokenCheck())
-                    .append(",enableProduceAuthenticate=")
-                    .append(serverAuthHandler.isEnableProduceAuthenticate())
-                    
.append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize())
-                    .append(",enableConsumeAuthenticate=")
-                    .append(serverAuthHandler.isEnableConsumeAuthenticate())
-                    .append(",enableConsumeAuthorize=")
-                    .append(serverAuthHandler.isEnableConsumeAuthorize())
-                    
.append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-                    
.append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
-                    
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
-                    .append(",brokerTopicSetConfList=")
-                    
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
-                sBuilder.delete(0, sBuilder.length());
-                
metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
-                    response.getConfCheckSumId(), 
response.getBrokerDefaultConfInfo(),
-                    response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+                procConfigFromRegister(sBuilder, response);
                 isKeepAlive.set(true);
                 lastRegTime.set(System.currentTimeMillis());
                 break;
@@ -435,11 +416,80 @@ public class TubeBroker implements Stoppable {
         }
     }
 
-    private void updateEnableBrokerFunInfo(final RegisterResponseM2B response) 
{
+
+    private void procConfigFromRegister(StringBuilder sBuilder,
+                                        final RegisterResponseM2B response) {
+        // process service status
+        ServiceStatusHolder
+                .setReadWriteServiceStatus(response.getStopRead(),
+                        response.getStopWrite(), "Master");
+        // process flow controller rules
+        FlowCtrlRuleHandler flowCtrlRuleHandler =
+                metadataManager.getFlowCtrlRuleHandler();
+        if (response.hasFlowCheckId()) {
+            int qryPriorityId = response.hasQryPriorityId()
+                    ? response.getQryPriorityId() : 
flowCtrlRuleHandler.getQryPriorityId();
+            if (response.getFlowCheckId() != 
flowCtrlRuleHandler.getFlowCtrlId()) {
+                try {
+                    flowCtrlRuleHandler
+                            .updateDefFlowCtrlInfo(response.getQryPriorityId(),
+                                    response.getFlowCheckId(), 
response.getFlowControlInfo());
+                } catch (Exception e1) {
+                    logger.warn("[Register response] found parse flowCtrl 
rules failure", e1);
+                }
+            }
+            if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
+                flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+            }
+        }
+        // update auth info
         serverAuthHandler.configure(response.getEnableBrokerInfo());
         if (response.hasBrokerAuthorizedInfo()) {
             
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
         }
+        // update cluster setting
+        if (response.hasClsConfig()) {
+            long configId = response.getClsConfig().getConfigId();
+            if (configId != ClusterConfigHolder.getConfigId()) {
+                ClusterConfigHolder.updClusterSetting(response.getClsConfig());
+            }
+        }
+        sBuilder.append("[Register response] received broker metadata info: 
brokerConfId=")
+                .append(response.getCurBrokerConfId())
+                .append(",stopWrite=").append(response.getStopWrite())
+                .append(",stopRead=").append(response.getStopRead())
+                
.append(",configCheckSumId=").append(response.getConfCheckSumId())
+                .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
+                
.append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
+                
.append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
+                .append(",hasClsConfig=").append(response.hasClsConfig())
+                
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
+                
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
+                
.append(",minMemCacheSize=").append(ClusterConfigHolder.getMinMemCacheSize())
+                .append(",enableVisitTokenCheck=")
+                .append(serverAuthHandler.isEnableVisitTokenCheck())
+                .append(",enableProduceAuthenticate=")
+                .append(serverAuthHandler.isEnableProduceAuthenticate())
+                
.append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize())
+                .append(",enableConsumeAuthenticate=")
+                .append(serverAuthHandler.isEnableConsumeAuthenticate())
+                .append(",enableConsumeAuthorize=")
+                .append(serverAuthHandler.isEnableConsumeAuthorize())
+                
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
+                .append(",brokerTopicSetConfList=")
+                
.append(response.getBrokerTopicSetConfInfoList().toString()).toString();
+        sBuilder.delete(0, sBuilder.length());
+        
metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
+                response.getConfCheckSumId(), 
response.getBrokerDefaultConfInfo(),
+                response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+    }
+
+    // build cluster configure info
+    private ClientMaster.ClusterConfig.Builder buildClusterConfig() {
+        ClientMaster.ClusterConfig.Builder defSetting =
+                ClientMaster.ClusterConfig.newBuilder();
+        defSetting.setConfigId(ClusterConfigHolder.getConfigId());
+        return defSetting;
     }
 
     /***
@@ -474,6 +524,7 @@ public class TubeBroker implements Stoppable {
         if (authInfoBuilder != null) {
             builder.setAuthInfo(authInfoBuilder.build());
         }
+        builder.setClsConfig(buildClusterConfig());
         logger.info(new StringBuilder(512)
             .append("[Register request] current broker report info: 
brokerConfId=")
             .append(metadataManager.getBrokerMetadataConfId())
@@ -517,6 +568,7 @@ public class TubeBroker implements Stoppable {
         if (authInfoBuilder != null) {
             builder.setAuthInfo(authInfoBuilder.build());
         }
+        builder.setClsConfig(buildClusterConfig());
         if (metadataManager.isBrokerMetadataChanged() || requireReportConf) {
             builder.setTakeConfInfo(true);
             
builder.setBrokerDefaultConfInfo(metadataManager.getBrokerDefMetaConfInfo());
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
new file mode 100644
index 0000000..c72427d
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.metadata;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.server.broker.utils.DataStoreUtils;
+
+
+public class ClusterConfigHolder {
+    private static AtomicLong configId =
+            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+    private static AtomicInteger maxMsgSize =
+            new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
+                    + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE);
+    private static AtomicInteger minMemCacheSize =
+            new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE);
+    private static AtomicInteger maxMsgStoreLength =
+            new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
+                    + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE
+                    + DataStoreUtils.STORE_DATA_HEADER_LEN);
+
+    public ClusterConfigHolder() {
+
+    }
+
+    // set master returned configure
+    public static void updClusterSetting(ClientMaster.ClusterConfig 
clusterConfig) {
+        if (clusterConfig == null) {
+            return;
+        }
+        if (configId.get() != clusterConfig.getConfigId()) {
+            configId.set(clusterConfig.getConfigId());
+            if (clusterConfig.hasMaxMsgSize()) {
+                int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(),
+                        TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+                        TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+                        + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
+                if (tmpMaxSize != maxMsgSize.get()) {
+                    maxMsgSize.set(tmpMaxSize);
+                    minMemCacheSize.set(tmpMaxSize +
+                            (tmpMaxSize % 4 + 1) * 
TBaseConstants.META_MESSAGE_SIZE_ADJUST);
+                    maxMsgStoreLength.set(tmpMaxSize + 
DataStoreUtils.STORE_DATA_HEADER_LEN);
+                }
+            }
+        }
+    }
+
+    public static long getConfigId() {
+        return configId.get();
+    }
+
+    public static int getMaxMsgSize() {
+        return maxMsgSize.get();
+    }
+
+    public static int getMinMemCacheSize() {
+        return minMemCacheSize.get();
+    }
+
+    public static int getMaxMsgStoreLength() {
+        return maxMsgStoreLength.get();
+    }
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index e610a8b..8f650b4 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -37,6 +37,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
 import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStatisInfo;
@@ -129,7 +130,7 @@ public class MessageStore implements Closeable {
         this.unflushThreshold.set(topicMetadata.getUnflushThreshold());
         this.unflushDataHold.set(topicMetadata.getUnflushDataHold());
         this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
-        this.writeCacheMaxSize = topicMetadata.getMemCacheMsgSize();
+        this.writeCacheMaxSize = 
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
         this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
         int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
         memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
@@ -421,7 +422,7 @@ public class MessageStore implements Closeable {
         writeCacheMutex.readLock().lock();
         try {
             writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
-            writeCacheMaxSize = topicMetadata.getMemCacheMsgSize();
+            writeCacheMaxSize = 
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
             writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
         } finally {
             writeCacheMutex.readLock().unlock();
@@ -603,6 +604,17 @@ public class MessageStore implements Closeable {
         }
     }
 
+    private int validAndGetMemCacheSize(int memCacheSize) {
+        if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) {
+            logger.info(new StringBuilder(512)
+                    .append("[Data Store] writeCacheMaxSize changed, from ")
+                    .append(memCacheSize).append(" to ")
+                    
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
+            memCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+        }
+        return memCacheSize;
+    }
+
     /***
      * Append message and trigger flush operation.
      *
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
index 949d149..6fda352 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.utils.CheckSum;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -365,7 +366,7 @@ public class FileSegment implements Segment {
                 itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN + 
itemMsglen;
                 if ((itemMsgToken != 
DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE)
                         || (itemMsglen <= 0)
-                        || (itemMsglen > 
DataStoreUtils.MAX_MSG_DATA_STORE_SIZE)
+                        || (itemMsglen > ClusterConfigHolder.getMaxMsgSize())
                         || (itemNext > totalBytes)) {
                     next = -1;
                     break;
@@ -437,7 +438,7 @@ public class FileSegment implements Segment {
                 if ((itemMsgPartId < 0)
                         || (itemMsgOffset < 0)
                         || (itemMsglen <= 0)
-                        || (itemMsglen > 
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN)
+                        || (itemMsglen > 
ClusterConfigHolder.getMaxMsgStoreLength())
                         || (itemNext > totalBytes)) {
                     next = -1;
                     break;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 400b666..fe23dd3 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,6 +35,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.tubemq.server.broker.stats.CountItem;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -273,7 +274,7 @@ public class MsgFileStore implements Closeable {
             // skip when mismatch condition
             if (curIndexDataOffset < 0
                     || curIndexDataSize <= 0
-                    || curIndexDataSize > 
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
+                    || curIndexDataSize > 
ClusterConfigHolder.getMaxMsgStoreLength()
                     || curIndexDataOffset < curDataMinOffset) {
                 readedOffset = curIndexOffset + 
DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 6da3d27..27eef32 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStore;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.utils.AppendResult;
@@ -224,7 +225,7 @@ public class MsgMemStore implements Closeable {
             if ((cDataOffset < 0)
                     || (cDataSize <= 0)
                     || (cDataOffset >= currDataOffset)
-                    || (cDataSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE 
+ 1024)
+                    || (cDataSize > ClusterConfigHolder.getMaxMsgSize())
                     || (cDataOffset + cDataSize > currDataOffset)) {
                 readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                 continue;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 05688a7..2ed75c3 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -88,8 +88,8 @@ public enum WebFieldDef {
     ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING,
             "Admin api operation authorization code",
             TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH),
-    MAXMSGSIZE(24, "maxMsgSize", "maxMsgSize", WebFieldType.INT,
-            "Max allowed message size", RegexDef.TMP_NUMBER),
+    MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
+            "Max allowed message size, unit MB", RegexDef.TMP_NUMBER),
     CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
             "Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
     MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index aa87940..ed2e122 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -32,6 +32,8 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
 import org.apache.tubemq.server.master.MasterConfig;
 import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
@@ -576,4 +578,35 @@ public class PBParameterUtils {
         retResult.setCheckData(tmpValue);
         return retResult;
     }
+
+    /**
+     * Check the string parameter
+     *
+     * @param fieldDef  the field to be checked
+     * @param paramValue the field value to be checked
+     * @param strBuffer the string pool construct the result
+     * @param result    the checked result
+     * @return result success or failure
+     */
+    public static boolean getStringParameter(WebFieldDef fieldDef,
+                                             String paramValue,
+                                             StringBuilder strBuffer,
+                                             ProcessResult result) {
+        if (TStringUtils.isBlank(paramValue)) {
+            result.setFailResult(strBuffer.append("Request miss necessary ")
+                    .append(fieldDef.name).append(" data!").toString());
+            strBuffer.delete(0, strBuffer.length());
+            return result.success;
+        }
+        String tmpValue = paramValue.trim();
+        if (tmpValue.length() > fieldDef.valMaxLen) {
+            result.setFailResult(strBuffer.append(fieldDef.name)
+                    .append("'s length over max value, allowed max length is ")
+                    .append(fieldDef.valMaxLen).toString());
+            strBuffer.delete(0, strBuffer.length());
+            return result.success;
+        }
+        result.setSuccResult(tmpValue);
+        return result.success;
+    }
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 385fe22..d4b1e20 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -297,37 +297,11 @@ public class WebParameterUtils {
                                            WebFieldDef fieldDef,
                                            boolean required,
                                            ProcessResult result) {
-        if (!getStringParamValue(req, fieldDef,
-                required, null, result)) {
-            return result.success;
-        }
-        Set<Integer> tgtValueSet = new HashSet<Integer>();
-        if (fieldDef.isCompFieldType()) {
-            Set<String> valItemSet = (Set<String>) result.retData1;
-            if (valItemSet.isEmpty()) {
-                result.setSuccResult(tgtValueSet);
-                return result.success;
-            }
-            for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(fieldDef,
-                        itemVal, false, -1, result)) {
-                    return result.success;
-                }
-                tgtValueSet.add((Integer) result.retData1);
-            }
-        } else {
-            String paramValue = (String) result.retData1;
-            if (paramValue == null) {
-                result.setSuccResult(tgtValueSet);
-                return result.success;
-            }
-            if (!checkIntValueNorms(fieldDef,
-                    paramValue, false, -1, result)) {
-                tgtValueSet.add((Integer) result.retData1);
-            }
-        }
-        result.setSuccResult(tgtValueSet);
-        return result.success;
+        return getIntParamValue(req, fieldDef, required,
+                false, TBaseConstants.META_VALUE_UNDEFINED,
+                false, TBaseConstants.META_VALUE_UNDEFINED,
+                false, TBaseConstants.META_VALUE_UNDEFINED,
+                result);
     }
 
     /**
@@ -342,11 +316,48 @@ public class WebParameterUtils {
      * @return process result
      */
     public static boolean getIntParamValue(HttpServletRequest req,
-                                                 WebFieldDef fieldDef,
-                                                 boolean required,
-                                                 int defValue,
-                                                 int minValue,
-                                                 ProcessResult result) {
+                                           WebFieldDef fieldDef,
+                                           boolean required,
+                                           int defValue,
+                                           int minValue,
+                                           ProcessResult result) {
+        return getIntParamValue(req, fieldDef, required, true, defValue,
+                true, minValue, false, TBaseConstants.META_VALUE_UNDEFINED, 
result);
+    }
+
+    /**
+     * Parse the parameter value from an object value to a integer value
+     *
+     * @param req        Http Servlet Request
+     * @param fieldDef   the parameter field definition
+     * @param required   a boolean value represent whether the parameter is 
must required
+     * @param defValue   a default value returned if the field not exist
+     * @param minValue   min value required
+     * @param minValue   max value allowed
+     * @param result     process result of parameter value
+     * @return process result
+     */
+    public static boolean getIntParamValue(HttpServletRequest req,
+                                           WebFieldDef fieldDef,
+                                           boolean required,
+                                           int defValue,
+                                           int minValue,
+                                           int maxValue,
+                                           ProcessResult result) {
+        return getIntParamValue(req, fieldDef, required, true, defValue,
+                true, minValue, true, maxValue, result);
+    }
+
+    private static boolean getIntParamValue(HttpServletRequest req,
+                                            WebFieldDef fieldDef,
+                                            boolean required,
+                                            boolean hasDefVal,
+                                            int defValue,
+                                            boolean hasMinVal,
+                                            int minValue,
+                                            boolean hasMaxVal,
+                                            int maxValue,
+                                            ProcessResult result) {
         if (!getStringParamValue(req, fieldDef, required, null, result)) {
             return result.success;
         }
@@ -354,13 +365,15 @@ public class WebParameterUtils {
             Set<Integer> tgtValueSet = new HashSet<Integer>();
             Set<String> valItemSet = (Set<String>) result.retData1;
             if (valItemSet.isEmpty()) {
-                tgtValueSet.add(defValue);
+                if (hasDefVal) {
+                    tgtValueSet.add(defValue);
+                }
                 result.setSuccResult(tgtValueSet);
                 return result.success;
             }
             for (String itemVal : valItemSet) {
-                if (!checkIntValueNorms(fieldDef,
-                        itemVal, true, minValue, result)) {
+                if (!checkIntValueNorms(fieldDef, itemVal,
+                        hasMinVal, minValue, hasMaxVal, maxValue, result)) {
                     return result.success;
                 }
                 tgtValueSet.add((Integer) result.retData1);
@@ -369,11 +382,13 @@ public class WebParameterUtils {
         } else {
             String paramValue = (String) result.retData1;
             if (paramValue == null) {
-                result.setSuccResult(defValue);
+                if (hasDefVal) {
+                    result.setSuccResult(defValue);
+                }
                 return result.success;
             }
-            checkIntValueNorms(fieldDef,
-                    paramValue, true, minValue, result);
+            checkIntValueNorms(fieldDef, paramValue,
+                    hasMinVal, minValue, hasMinVal, maxValue, result);
         }
         return result.success;
     }
@@ -691,6 +706,8 @@ public class WebParameterUtils {
      * @param paramValue   the parameter value
      * @param hasMinVal    whether there is a minimum
      * param minValue      the parameter min value
+     * @param hasMaxVal    whether there is a maximum
+     * param maxValue      the parameter max value
      * @param result   process result
      * @return check result for string value of parameter
      */
@@ -698,6 +715,8 @@ public class WebParameterUtils {
                                               String paramValue,
                                               boolean hasMinVal,
                                               int minValue,
+                                              boolean hasMaxVal,
+                                              int maxValue,
                                               ProcessResult result) {
         try {
             int paramIntVal = Integer.parseInt(paramValue);
@@ -707,6 +726,12 @@ public class WebParameterUtils {
                         .append(" value must >= 
").append(minValue).toString());
                 return false;
             }
+            if (hasMaxVal && paramIntVal > maxValue) {
+                result.setFailResult(new StringBuilder(512)
+                        .append("Parameter ").append(fieldDef.name)
+                        .append(" value must <= 
").append(maxValue).toString());
+                return false;
+            }
             result.setSuccResult(paramIntVal);
         } catch (Throwable e) {
             result.setFailResult(new StringBuilder(512)
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 4299bc7..e7327cb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -48,6 +48,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
 import org.apache.tubemq.corebase.cluster.SubscribeInfo;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.config.TLSConfig;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
 import 
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
 import 
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestC2M;
 import 
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestP2M;
@@ -101,6 +102,7 @@ import 
org.apache.tubemq.server.master.balance.DefaultLoadBalancer;
 import org.apache.tubemq.server.master.balance.LoadBalancer;
 import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService;
 import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
 import 
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
 import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
@@ -348,6 +350,11 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
         
builder.setBrokerCheckSum(this.defaultBrokerConfManager.getBrokerInfoCheckSum());
         
builder.addAllBrokerInfos(this.defaultBrokerConfManager.getBrokersMap(overtls).values());
         
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, 
false).build());
+        ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
+                buildApprovedClientConfig(request.getAppdConfig());
+        if (clientConfigBuilder != null) {
+            builder.setAppdConfig(clientConfigBuilder);
+        }
         logger.info(strBuffer.append("[Producer Register] ").append(producerId)
             .append(", isOverTLS=").append(overtls)
             .append(", clientJDKVer=").append(clientJdkVer).toString());
@@ -436,6 +443,11 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
         if (defaultBrokerConfManager.getBrokerInfoCheckSum() != 
inBrokerCheckSum) {
             
builder.addAllBrokerInfos(defaultBrokerConfManager.getBrokersMap(overtls).values());
         }
+        ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
+                buildApprovedClientConfig(request.getAppdConfig());
+        if (clientConfigBuilder != null) {
+            builder.setAppdConfig(clientConfigBuilder);
+        }
         if (logger.isDebugEnabled()) {
             logger.debug(strBuffer.append("[Push Producer's available topic 
count:]")
                     .append(producerId).append(TokenConstants.LOG_SEG_SEP)
@@ -1071,6 +1083,11 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
         
builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
         
builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
         builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
+        ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
+                buildClusterConfig(request.getClsConfig());
+        if (clusterConfigBuilder != null) {
+            builder.setClsConfig(clusterConfigBuilder);
+        }
         if (request.hasFlowCheckId()) {
             BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
                     defaultBrokerConfManager.getBdbDefFlowCtrl();
@@ -1259,6 +1276,11 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
             }
         }
         brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), 
builder);
+        ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
+                buildClusterConfig(request.getClsConfig());
+        if (clusterConfigBuilder != null) {
+            builder.setClsConfig(clusterConfigBuilder);
+        }
         builder.setTakeRemoveTopicInfo(true);
         builder.addAllRemoveTopicConfInfo(defaultBrokerConfManager
                 .getBrokerRemovedTopicStrConfigInfo(bdbBrokerConfEntity));
@@ -2295,6 +2317,56 @@ public class TMaster extends HasThread implements 
MasterService, Stoppable {
     }
 
     /**
+     * build approved client configure
+     *
+     * @param inClientConfig client reported Configure info
+     * @return ApprovedClientConfig
+     */
+    private ClientMaster.ApprovedClientConfig.Builder 
buildApprovedClientConfig(
+            ClientMaster.ApprovedClientConfig inClientConfig) {
+        ClientMaster.ApprovedClientConfig.Builder outClientConfig = null;
+        if (inClientConfig != null) {
+            outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
+            BdbClusterSettingEntity settingEntity =
+                    this.defaultBrokerConfManager.getBdbClusterSetting();
+            if (settingEntity == null) {
+                
outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
+            } else {
+                outClientConfig.setConfigId(settingEntity.getConfigId());
+                if (settingEntity.getConfigId() != 
inClientConfig.getConfigId()) {
+                    
outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
+                }
+            }
+        }
+        return outClientConfig;
+    }
+
+
+    /**
+     * build cluster configure info
+     *
+     * @param inClusterConfig broker reported Configure info
+     * @return ClusterConfig
+     */
+    private ClientMaster.ClusterConfig.Builder buildClusterConfig(
+            ClientMaster.ClusterConfig inClusterConfig) {
+        ClientMaster.ClusterConfig.Builder outClsConfig = null;
+        if (inClusterConfig != null) {
+            outClsConfig = ClientMaster.ClusterConfig.newBuilder();
+            BdbClusterSettingEntity settingEntity =
+                    this.defaultBrokerConfManager.getBdbClusterSetting();
+            if (settingEntity == null) {
+                outClsConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
+            } else {
+                outClsConfig.setConfigId(settingEntity.getConfigId());
+                if (settingEntity.getConfigId() != 
inClusterConfig.getConfigId()) {
+                    
outClsConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
+                }
+            }
+        }
+        return outClsConfig;
+    }
+    /**
      * Start balance chore
      *
      * @param master
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
index 588fd87..7b9b570 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -37,6 +37,7 @@ public class BdbClusterSettingEntity implements Serializable {
 
     @PrimaryKey
     private String recordKey = "";
+    private long configId = TBaseConstants.META_VALUE_UNDEFINED;
     //broker tcp port
     private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
     //broker tls port
@@ -48,20 +49,22 @@ public class BdbClusterSettingEntity implements 
Serializable {
     //partition num
     private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
     //flush disk threshold
-    private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+    private int unflushThreshold = TBaseConstants.META_VALUE_UNDEFINED;
     //flush disk interval
-    private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED;
-    //flush memory cache threshold
-    private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED;
-    //flush memory cache interval
-    private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED;
+    private int unflushInterval = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush disk data count
+    private int unflushDataHold = TBaseConstants.META_VALUE_UNDEFINED;
     //flush memory cache count
-    private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED;
+    private int memCacheMsgCntInK = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush memory cache interval
+    private int memCacheFlushIntvl = TBaseConstants.META_VALUE_UNDEFINED;
+    //flush memory cache size
+    private int memCacheMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
     private boolean acceptPublish = true;   //enable publish
     private boolean acceptSubscribe = true; //enable subscribe
-    private String deleteWhen = "";              //delete policy execute time
+    private String deletePolicy = "";              //delete policy execute time
     private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
-    private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+    private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
     private String attributes = "";             //extra attribute
     private String modifyUser;               //modify user
     private Date modifyDate;                 //modify date
@@ -70,30 +73,34 @@ public class BdbClusterSettingEntity implements 
Serializable {
     }
 
     //Constructor
-    public BdbClusterSettingEntity(String recordKey, int brokerPort, int 
brokerTLSPort,
-                                   int brokerWebPort, int numTopicStores, int 
numPartitions,
-                                   int unflushDskThreshold, int 
unflushDksInterval,
-                                   int unflushMemThreshold, int 
unflushMemInterval,
-                                   int unflushMemCnt, boolean acceptPublish,
-                                   boolean acceptSubscribe, String deleteWhen,
-                                   int qryPriorityId, int maxMsgSize, String 
attributes,
+    public BdbClusterSettingEntity(String recordKey, long configId, int 
brokerPort,
+                                   int brokerTLSPort, int brokerWebPort,
+                                   int numTopicStores, int numPartitions,
+                                   int unflushThreshold, int unflushInterval,
+                                   int unflushDataHold, int memCacheMsgCntInK,
+                                   int memCacheFlushIntvl, int 
memCacheMsgSizeInMB,
+                                   boolean acceptPublish, boolean 
acceptSubscribe,
+                                   String deletePolicy, int qryPriorityId,
+                                   int maxMsgSizeInB, String attributes,
                                    String modifyUser, Date modifyDate) {
         this.recordKey = recordKey;
+        this.configId = configId;
         this.brokerPort = brokerPort;
         this.brokerTLSPort = brokerTLSPort;
         this.brokerWebPort = brokerWebPort;
         this.numTopicStores = numTopicStores;
         this.numPartitions = numPartitions;
-        this.unflushDskThreshold = unflushDskThreshold;
-        this.unflushDksInterval = unflushDksInterval;
-        this.unflushMemThreshold = unflushMemThreshold;
-        this.unflushMemInterval = unflushMemInterval;
-        this.unflushMemCnt = unflushMemCnt;
+        this.unflushThreshold = unflushThreshold;
+        this.unflushInterval = unflushInterval;
+        this.unflushDataHold = unflushDataHold;
+        this.memCacheMsgCntInK = memCacheMsgCntInK;
+        this.memCacheFlushIntvl = memCacheFlushIntvl;
+        this.memCacheMsgSizeInMB = memCacheMsgSizeInMB;
         this.acceptPublish = acceptPublish;
         this.acceptSubscribe = acceptSubscribe;
-        this.deleteWhen = deleteWhen;
+        this.deletePolicy = deletePolicy;
         this.qryPriorityId = qryPriorityId;
-        this.maxMsgSize = maxMsgSize;
+        this.maxMsgSizeInB = maxMsgSizeInB;
         this.attributes = attributes;
         this.modifyUser = modifyUser;
         this.modifyDate = modifyDate;
@@ -107,6 +114,10 @@ public class BdbClusterSettingEntity implements 
Serializable {
         return recordKey;
     }
 
+    public long getConfigId() {
+        return configId;
+    }
+
     public int getBrokerPort() {
         return brokerPort;
     }
@@ -147,44 +158,52 @@ public class BdbClusterSettingEntity implements 
Serializable {
         this.numPartitions = numPartitions;
     }
 
-    public int getUnflushDskThreshold() {
-        return unflushDskThreshold;
+    public int getUnflushThreshold() {
+        return unflushThreshold;
+    }
+
+    public void setUnflushThreshold(int unflushThreshold) {
+        this.unflushThreshold = unflushThreshold;
+    }
+
+    public int getUnflushInterval() {
+        return unflushInterval;
     }
 
-    public void setUnflushDskThreshold(int unflushDskThreshold) {
-        this.unflushDskThreshold = unflushDskThreshold;
+    public void setUnflushInterval(int unflushInterval) {
+        this.unflushInterval = unflushInterval;
     }
 
-    public int getUnflushDksInterval() {
-        return unflushDksInterval;
+    public int getUnflushDataHold() {
+        return unflushDataHold;
     }
 
-    public void setUnflushDksInterval(int unflushDksInterval) {
-        this.unflushDksInterval = unflushDksInterval;
+    public void setUnflushDataHold(int unflushDataHold) {
+        this.unflushDataHold = unflushDataHold;
     }
 
-    public int getUnflushMemThreshold() {
-        return unflushMemThreshold;
+    public int getMemCacheMsgCntInK() {
+        return memCacheMsgCntInK;
     }
 
-    public void setUnflushMemThreshold(int unflushMemThreshold) {
-        this.unflushMemThreshold = unflushMemThreshold;
+    public void setMemCacheMsgCntInK(int memCacheMsgCntInK) {
+        this.memCacheMsgCntInK = memCacheMsgCntInK;
     }
 
-    public int getUnflushMemInterval() {
-        return unflushMemInterval;
+    public int getMemCacheFlushIntvl() {
+        return memCacheFlushIntvl;
     }
 
-    public void setUnflushMemInterval(int unflushMemInterval) {
-        this.unflushMemInterval = unflushMemInterval;
+    public void setMemCacheFlushIntvl(int memCacheFlushIntvl) {
+        this.memCacheFlushIntvl = memCacheFlushIntvl;
     }
 
-    public int getUnflushMemCnt() {
-        return unflushMemCnt;
+    public int getMemCacheMsgSizeInMB() {
+        return memCacheMsgSizeInMB;
     }
 
-    public void setUnflushMemCnt(int unflushMemCnt) {
-        this.unflushMemCnt = unflushMemCnt;
+    public void setMemCacheMsgSizeInMB(int memCacheMsgSizeInMB) {
+        this.memCacheMsgSizeInMB = memCacheMsgSizeInMB;
     }
 
     public boolean isAcceptPublish() {
@@ -203,12 +222,12 @@ public class BdbClusterSettingEntity implements 
Serializable {
         this.acceptSubscribe = acceptSubscribe;
     }
 
-    public String getDeleteWhen() {
-        return deleteWhen;
+    public String getDeletePolicy() {
+        return deletePolicy;
     }
 
-    public void setDeleteWhen(String deleteWhen) {
-        this.deleteWhen = deleteWhen;
+    public void setDeletePolicy(String deletePolicy) {
+        this.deletePolicy = deletePolicy;
     }
 
     public int getQryPriorityId() {
@@ -219,12 +238,12 @@ public class BdbClusterSettingEntity implements 
Serializable {
         this.qryPriorityId = qryPriorityId;
     }
 
-    public int getMaxMsgSize() {
-        return maxMsgSize;
+    public int getMaxMsgSizeInB() {
+        return maxMsgSizeInB;
     }
 
-    public void setMaxMsgSize(int maxMsgSize) {
-        this.maxMsgSize = maxMsgSize;
+    public void setMaxMsgSizeInB(int maxMsgSizeInB) {
+        this.maxMsgSizeInB = maxMsgSizeInB;
     }
 
     public String getAttributes() {
@@ -236,6 +255,7 @@ public class BdbClusterSettingEntity implements 
Serializable {
     }
 
     public void setModifyInfo(String modifyUser, Date modifyDate) {
+        this.configId = System.currentTimeMillis();
         this.modifyUser = modifyUser;
         this.modifyDate = modifyDate;
     }
@@ -255,23 +275,30 @@ public class BdbClusterSettingEntity implements 
Serializable {
      * @return
      */
     public StringBuilder toJsonString(final StringBuilder sBuilder) {
-        return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
+        sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
                 .append("\"recordKey\":\"").append(recordKey).append("\"")
+                .append(",\"configId\":").append(configId)
                 .append(",\"brokerPort\":").append(brokerPort)
                 .append(",\"brokerTLSPort\":").append(brokerTLSPort)
                 .append(",\"brokerWebPort\":").append(brokerWebPort)
                 .append(",\"numTopicStores\":").append(numTopicStores)
                 .append(",\"numPartitions\":").append(numPartitions)
-                
.append(",\"unflushDskThreshold\":").append(unflushDskThreshold)
-                .append(",\"unflushDksInterval\":").append(unflushDksInterval)
-                
.append(",\"unflushMemThreshold\":").append(unflushMemThreshold)
-                .append(",\"unflushMemInterval\":").append(unflushMemInterval)
-                .append(",\"unflushMemCnt\":").append(unflushMemCnt)
+                .append(",\"unflushThreshold\":").append(unflushThreshold)
+                .append(",\"unflushInterval\":").append(unflushInterval)
+                .append(",\"unflushDataHold\":").append(unflushDataHold)
+                .append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK)
+                .append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl)
+                
.append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB)
                 .append(",\"acceptPublish\":").append(acceptPublish)
                 .append(",\"acceptSubscribe\":").append(acceptSubscribe)
-                .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"")
-                .append(",\"maxMsgSize\":").append(maxMsgSize)
-                .append(",\"qryPriorityId\":").append(qryPriorityId)
+                
.append(",\"deletePolicy\":\"").append(deletePolicy).append("\"")
+                .append(",\"maxMsgSizeInMB\":");
+        if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) {
+            sBuilder.append(maxMsgSizeInB);
+        } else {
+            sBuilder.append(maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE);
+        }
+        return sBuilder.append(",\"qryPriorityId\":").append(qryPriorityId)
                 .append(",\"attributes\":\"").append(attributes).append("\"")
                 .append(",\"modifyUser\":\"").append(modifyUser).append("\"")
                 .append(",\"modifyDate\":\"")
@@ -281,23 +308,30 @@ public class BdbClusterSettingEntity implements 
Serializable {
 
     @Override
     public String toString() {
-        return new ToStringBuilder(this)
+        ToStringBuilder sBuilder = new ToStringBuilder(this)
                 .append("recordKey", recordKey)
+                .append("configId", configId)
                 .append("brokerPort", brokerPort)
                 .append("brokerTLSPort", brokerTLSPort)
                 .append("brokerWebPort", brokerWebPort)
                 .append("numTopicStores", numTopicStores)
                 .append("numPartitions", numPartitions)
-                .append("unflushDskThreshold", unflushDskThreshold)
-                .append("unflushDksInterval", unflushDksInterval)
-                .append("unflushMemThreshold", unflushMemThreshold)
-                .append("unflushMemInterval", unflushMemInterval)
-                .append("unflushMemCnt", unflushMemCnt)
+                .append("unflushThreshold", unflushThreshold)
+                .append("unflushInterval", unflushInterval)
+                .append("unflushDataHold", unflushDataHold)
+                .append("memCacheMsgCntInK", memCacheMsgCntInK)
+                .append("memCacheFlushIntvl", memCacheFlushIntvl)
+                .append("memCacheMsgSizeInMB", memCacheMsgSizeInMB)
                 .append("acceptPublish", acceptPublish)
                 .append("acceptSubscribe", acceptSubscribe)
-                .append("deleteWhen", deleteWhen)
-                .append("maxMsgSize", maxMsgSize)
-                .append("qryPriorityId", qryPriorityId)
+                .append("deletePolicy", deletePolicy);
+        if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) {
+            sBuilder.append("maxMsgSizeInMB", maxMsgSizeInB);
+        } else {
+            sBuilder.append("maxMsgSizeInMB",
+                    maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE);
+        }
+        return sBuilder.append("qryPriorityId", qryPriorityId)
                 .append("attributes", attributes)
                 .append("modifyUser", modifyUser)
                 .append("modifyDate", modifyDate)
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 9afe1aa..f2fece8 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -169,13 +169,14 @@ public class WebMasterInfoHandler extends 
AbstractWebHandler {
         if (!WebParameterUtils.getIntParamValue(req,
                 WebFieldDef.MAXMSGSIZE, false,
                 TBaseConstants.META_VALUE_UNDEFINED,
-                TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+                TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+                TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
                 result)) {
             WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
             return sBuilder;
         }
-        int maxMsgSize = (int) result.retData1;
-        if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) {
+        int maxMsgSizeInMB = (int) result.retData1;
+        if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
             dataChanged = true;
         }
         // check and get modify date
@@ -196,9 +197,9 @@ public class WebMasterInfoHandler extends 
AbstractWebHandler {
             defClusterSetting = new BdbClusterSettingEntity();
         }
         defClusterSetting.setModifyInfo(modifyUser, modifyDate);
-        if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) {
-            defClusterSetting.setMaxMsgSize(
-                    SettingValidUtils.validAndGetMaxMsgSize(maxMsgSize));
+        if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
+            defClusterSetting.setMaxMsgSizeInB(
+                    
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB));
         }
         try {
             brokerConfManager.confSetBdbClusterDefSetting(defClusterSetting);

Reply via email to