This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 6ca161c8603d2cd98a5cb217ecb87f672682ef0d Author: gosonzhang <[email protected]> AuthorDate: Mon Jan 11 16:18:46 2021 +0800 [TUBEMQ-501] Adjust max message size check logic --- .../tubemq/client/producer/AllowedSetting.java | 4 +- .../tubemq/client/producer/ProducerManager.java | 48 +++- .../client/producer/SimpleMessageProducer.java | 25 +- .../org/apache/tubemq/corebase/TBaseConstants.java | 18 +- .../apache/tubemq/corebase/utils/MixedUtils.java | 10 + .../tubemq/corebase/utils/SettingValidUtils.java | 19 +- .../org/apache/tubemq/corerpc/RpcConstants.java | 6 +- tubemq-core/src/main/proto/MasterService.proto | 10 +- .../tubemq/server/broker/BrokerServiceServer.java | 5 +- .../apache/tubemq/server/broker/TubeBroker.java | 270 ++++++++++++--------- .../broker/metadata/ClusterConfigHolder.java | 82 +++++++ .../server/broker/msgstore/MessageStore.java | 16 +- .../server/broker/msgstore/disk/FileSegment.java | 5 +- .../server/broker/msgstore/disk/MsgFileStore.java | 3 +- .../server/broker/msgstore/mem/MsgMemStore.java | 3 +- .../tubemq/server/common/fielddef/WebFieldDef.java | 4 +- .../server/common/paramcheck/PBParameterUtils.java | 33 +++ .../server/common/utils/WebParameterUtils.java | 109 +++++---- .../org/apache/tubemq/server/master/TMaster.java | 72 ++++++ .../bdbentitys/BdbClusterSettingEntity.java | 172 +++++++------ .../master/web/handler/WebMasterInfoHandler.java | 13 +- 21 files changed, 650 insertions(+), 277 deletions(-) diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java index cabf928..7beb4e0 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java @@ -45,8 +45,8 @@ public class AllowedSetting { } if (allowedConfig.hasMaxMsgSize() && allowedConfig.getMaxMsgSize() != maxMsgSize.get()) { - maxMsgSize.set( - SettingValidUtils.validAndGetMaxMsgSize(allowedConfig.getMaxMsgSize())); + maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB( + allowedConfig.getMaxMsgSize())); } } } diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java index ea1fd51..07586ec 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java @@ -79,6 +79,8 @@ public class ProducerManager { private final ScheduledExecutorService heartbeatService; private final AtomicLong visitToken = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); + private final AllowedSetting allowedSetting = + new AllowedSetting(); private final AtomicReference<String> authAuthorizedTokenRef = new AtomicReference<>(""); private final ClientAuthenticateHandler authenticateHandler = @@ -311,6 +313,15 @@ public class ProducerManager { } /** + * Get allowed message size. + * + * @return max allowed message size + */ + public int getMaxMsgSize() { + return allowedSetting.getMaxMsgSize(); + } + + /** * Check if the producer manager is shutdown. * * @return producer status @@ -396,7 +407,7 @@ public class ProducerManager { updateBrokerInfoList(true, response.getBrokerInfosList(), response.getBrokerCheckSum(), sBuilder); } - processRegAuthorizedToken(response); + processRegSyncInfo(response); return; } if (remainingRetry <= 0) { @@ -436,6 +447,7 @@ public class ProducerManager { if (authInfoBuilder != null) { builder.setAuthInfo(authInfoBuilder.build()); } + builder.setAppdConfig(buildAllowedConfig4P()); return builder.build(); } @@ -455,6 +467,7 @@ public class ProducerManager { if (authInfoBuilder != null) { builder.setAuthInfo(authInfoBuilder.build()); } + builder.setAppdConfig(buildAllowedConfig4P()); return builder.build(); } @@ -544,13 +557,22 @@ public class ProducerManager { } } - private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2P response) { + private void processRegSyncInfo(ClientMaster.RegisterResponseM2P response) { if (response.hasAuthorizedInfo()) { processAuthorizedToken(response.getAuthorizedInfo()); } + if (response.hasAppdConfig()) { + procAllowedConfig4P(response.getAppdConfig()); + } } - private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2P response) { + private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P response) { + if (response.hasRequireAuth()) { + nextWithAuthInfo2M.set(response.getRequireAuth()); + } + if (response.hasAppdConfig()) { + procAllowedConfig4P(response.getAppdConfig()); + } if (response.hasAuthorizedInfo()) { processAuthorizedToken(response.getAuthorizedInfo()); } @@ -595,6 +617,21 @@ public class ProducerManager { return authInfoBuilder; } + // build allowed configure info + private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() { + ClientMaster.ApprovedClientConfig.Builder appdConfig = + ClientMaster.ApprovedClientConfig.newBuilder(); + appdConfig.setConfigId(allowedSetting.getConfigId()); + return appdConfig; + } + + // set allowed configure info + private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig allowedConfig) { + if (allowedConfig != null) { + allowedSetting.updAllowedSetting(allowedConfig); + } + } + // #lizard forgives private class ProducerHeartbeatTask implements Runnable { @Override @@ -633,10 +670,7 @@ public class ProducerManager { } return; } - if (response.hasRequireAuth()) { - nextWithAuthInfo2M.set(response.getRequireAuth()); - } - processHeartBeatAuthorizedToken(response); + processHeartBeatSyncInfo(response); if (response.getErrCode() == TErrCodeConstants.NOT_READY) { lastHeartbeatTime = System.currentTimeMillis(); return; diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java index 7d6894d..946f03e 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java @@ -288,19 +288,6 @@ public class SimpleMessageProducer implements MessageProducer { || (message.getData().length == 0)) { throw new TubeClientException("Illegal parameter: null data in message package!"); } - int msgSize = TStringUtils.isBlank(message.getAttribute()) - ? message.getData().length : (message.getData().length + message.getAttribute().length()); - if (msgSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE) { - throw new TubeClientException(new StringBuilder(512) - .append("Illegal parameter: over max message length for the total size of") - .append(" message data and attribute, allowed size is ") - .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE) - .append(", message's real size is ").append(msgSize).toString()); - } - if (isShutDown.get()) { - throw new TubeClientException("Status error: producer has been shutdown!"); - } - if (this.publishTopicMap.get(message.getTopic()) == null) { throw new TubeClientException(new StringBuilder(512) .append("Topic ").append(message.getTopic()) @@ -311,6 +298,18 @@ public class SimpleMessageProducer implements MessageProducer { .append("Topic ").append(message.getTopic()) .append(" not publish, make sure the topic exist or acceptPublish and try later!").toString()); } + int msgSize = TStringUtils.isBlank(message.getAttribute()) + ? message.getData().length : (message.getData().length + message.getAttribute().length()); + if (msgSize > producerManager.getMaxMsgSize()) { + throw new TubeClientException(new StringBuilder(512) + .append("Illegal parameter: over max message length for the total size of") + .append(" message data and attribute, allowed size is ") + .append(producerManager.getMaxMsgSize()) + .append(", message's real size is ").append(msgSize).toString()); + } + if (isShutDown.get()) { + throw new TubeClientException("Status error: producer has been shutdown!"); + } } private ClientBroker.SendMessageRequestP2B createSendMessageRequest(Partition partition, diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java index d91b083..5238323 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java @@ -29,9 +29,6 @@ public class TBaseConstants { public static final String META_DEFAULT_CHARSET_NAME = "UTF-8"; public static final int META_MAX_MSGTYPE_LENGTH = 255; - public static final int META_MAX_MESSAGE_HEADER_SIZE = 1024; - public static final int META_MAX_MESSAGE_DATA_SIZE = 1024 * 1024; - public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT = 20 * 1024 * 1024; public static final int META_MAX_PARTITION_COUNT = 100; public static final int META_MAX_BROKER_IP_LENGTH = 32; public static final int META_MAX_USERNAME_LENGTH = 64; @@ -67,4 +64,19 @@ public class TBaseConstants { public static final long CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL = 20000; + public static final int META_MB_UNIT_SIZE = (1024 * 1024); + public static final int META_MESSAGE_SIZE_ADJUST = (512 * 1024); + public static final int META_MAX_MESSAGE_HEADER_SIZE = (10 * 1024); + + public static final int META_MIN_ALLOWED_MESSAGE_SIZE_MB = 1; + public static final int META_MAX_ALLOWED_MESSAGE_SIZE_MB = 20; + public static final int META_MAX_MESSAGE_DATA_SIZE = + META_MIN_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE; + public static final int META_MIN_MEM_BUFFER_SIZE = + META_MAX_MESSAGE_DATA_SIZE + META_MESSAGE_SIZE_ADJUST; + public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT = + META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE; + + + } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java index bfbedad..1374b72 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java @@ -92,4 +92,14 @@ public class MixedUtils { dataBuffer.flip(); return dataBuffer.array(); } + + // get the middle data between min, max, and data + public static int mid(int data, int min, int max) { + return Math.max(min, Math.min(max, data)); + } + + public static long mid(long data, long min, long max) { + return Math.max(min, Math.min(max, data)); + } + } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java index 4a206ef..e748ba4 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java @@ -22,18 +22,17 @@ import org.apache.tubemq.corebase.TBaseConstants; public class SettingValidUtils { - // get the middle data between min, max, and data - public static int mid(int data, int min, int max) { - return Math.max(min, Math.min(max, data)); - } - public static long mid(long data, long min, long max) { - return Math.max(min, Math.min(max, data)); + public static int validAndXfeMaxMsgSizeFromMBtoB(int inMaxMsgSizeInMB) { + return MixedUtils.mid(inMaxMsgSizeInMB, + TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB, + TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB) + * TBaseConstants.META_MB_UNIT_SIZE; } - public static int validAndGetMaxMsgSize(int inMaxMsgSize) { - return mid(inMaxMsgSize, - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT); + public static int validAndGetMaxMsgSizeInB(int inMaxMsgSizeInB) { + return MixedUtils.mid(inMaxMsgSizeInB, + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT); } } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java index e99ffc3..3a150b3 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java @@ -18,6 +18,9 @@ package org.apache.tubemq.corerpc; +import org.apache.tubemq.corebase.TBaseConstants; + + public final class RpcConstants { public static final String RPC_CODEC = "rpc.codec"; @@ -66,7 +69,8 @@ public final class RpcConstants { public static final int RPC_PROTOCOL_BEGIN_TOKEN = 0xFF7FF4FE; public static final int RPC_MAX_BUFFER_SIZE = 8192; public static final int MAX_FRAME_MAX_LIST_SIZE = - (int) ((1024 * 1024 * 8) / RPC_MAX_BUFFER_SIZE); + (int) ((TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT + + TBaseConstants.META_MB_UNIT_SIZE * 8) / RPC_MAX_BUFFER_SIZE); public static final int RPC_FLAG_MSG_TYPE_REQUEST = 0x0; public static final int RPC_FLAG_MSG_TYPE_RESPONSE = 0x1; diff --git a/tubemq-core/src/main/proto/MasterService.proto b/tubemq-core/src/main/proto/MasterService.proto index 27e5496..b97f3fc 100644 --- a/tubemq-core/src/main/proto/MasterService.proto +++ b/tubemq-core/src/main/proto/MasterService.proto @@ -66,7 +66,7 @@ message ApprovedClientConfig { optional int32 maxMsgSize = 2; } -message ClusterDefConfig { +message ClusterConfig { required int64 configId = 1; optional int32 maxMsgSize = 2; } @@ -222,7 +222,7 @@ message RegisterRequestB2M { optional int32 qryPriorityId = 12; optional int32 tlsPort = 13; optional MasterCertificateInfo authInfo = 14; - optional ClusterDefConfig clsDefConfig = 15; + optional ClusterConfig clsConfig = 15; } message RegisterResponseM2B { @@ -245,7 +245,7 @@ message RegisterResponseM2B { optional int32 qryPriorityId = 15; optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated */ optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17; - optional ClusterDefConfig clsDefConfig = 18; + optional ClusterConfig clsConfig = 18; } message HeartRequestB2M { @@ -266,7 +266,7 @@ message HeartRequestB2M { optional int64 flowCheckId = 13; optional int32 qryPriorityId = 14; optional MasterCertificateInfo authInfo = 15; - optional ClusterDefConfig clsDefConfig = 16; + optional ClusterConfig clsConfig = 16; } message HeartResponseM2B { @@ -292,7 +292,7 @@ message HeartResponseM2B { optional int32 qryPriorityId = 17; optional MasterAuthorizedInfo authorizedInfo = 18; /* Deprecated */ optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19; - optional ClusterDefConfig clsDefConfig = 20; + optional ClusterConfig clsConfig = 20; } message CloseRequestB2M { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java index f9b242f..60490c6 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java @@ -55,6 +55,7 @@ import org.apache.tubemq.corerpc.RpcConstants; import org.apache.tubemq.corerpc.service.BrokerReadService; import org.apache.tubemq.corerpc.service.BrokerWriteService; import org.apache.tubemq.server.Server; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.metadata.MetadataManager; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; @@ -633,10 +634,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic builder.setErrMsg("data length is zero!"); return builder.build(); } - if (dataLength > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024) { + if (dataLength > ClusterConfigHolder.getMaxMsgSize()) { builder.setErrCode(TErrCodeConstants.BAD_REQUEST); builder.setErrMsg(strBuffer.append("data length over max length, allowed max length is ") - .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024) + .append(ClusterConfigHolder.getMaxMsgSize()) .append(", data length is ").append(dataLength).toString()); return builder.build(); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java index 556cbba..690f0d7 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java @@ -46,6 +46,7 @@ import org.apache.tubemq.corerpc.service.MasterService; import org.apache.tubemq.server.Stoppable; import org.apache.tubemq.server.broker.exception.StartupException; import org.apache.tubemq.server.broker.metadata.BrokerMetadataManager; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.metadata.MetadataManager; import org.apache.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; @@ -223,68 +224,8 @@ public class TubeBroker implements Stoppable { } isKeepAlive.set(true); heartbeatErrors.set(0); - FlowCtrlRuleHandler flowCtrlRuleHandler = - metadataManager.getFlowCtrlRuleHandler(); - long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId(); - int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId(); - ServiceStatusHolder - .setReadWriteServiceStatus(response.getStopRead(), - response.getStopWrite(), "Master"); - if (response.hasFlowCheckId()) { - qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : qryPriorityId; - if (response.getFlowCheckId() != flowCheckId) { - flowCheckId = response.getFlowCheckId(); - try { - flowCtrlRuleHandler - .updateDefFlowCtrlInfo(qryPriorityId, - flowCheckId, response.getFlowControlInfo()); - } catch (Exception e1) { - logger.warn( - "[HeartBeat response] found parse flowCtrl rules failure", e1); - } - } - if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { - flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); - } - } - requireReportConf = response.getNeedReportData(); StringBuilder sBuilder = new StringBuilder(512); - if (response.getTakeConfInfo()) { - logger.info(sBuilder - .append("[HeartBeat response] received broker metadata info: brokerConfId=") - .append(response.getCurBrokerConfId()) - .append(",stopWrite=").append(response.getStopWrite()) - .append(",stopRead=").append(response.getStopRead()) - .append(",configCheckSumId=").append(response.getConfCheckSumId()) - .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) - .append(",curFlowCtrlId=").append(flowCheckId) - .append(",curQryPriorityId=").append(qryPriorityId) - .append(",brokerDefaultConfInfo=") - .append(response.getBrokerDefaultConfInfo()) - .append(",brokerTopicSetConfList=") - .append(response.getBrokerTopicSetConfInfoList().toString()).toString()); - sBuilder.delete(0, sBuilder.length()); - metadataManager - .updateBrokerTopicConfigMap(response.getCurBrokerConfId(), - response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), - response.getBrokerTopicSetConfInfoList(), false, sBuilder); - } - if (response.hasBrokerAuthorizedInfo()) { - serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo()); - } - boolean needProcess = - metadataManager.updateBrokerRemoveTopicMap( - response.getTakeRemoveTopicInfo(), - response.getRemoveTopicConfInfoList(), sBuilder); - if (needProcess) { - new Thread() { - @Override - public void run() { - storeManager.removeTopicStore(); - } - }.start(); - } + procConfigFromHeartBeat(sBuilder, response); } catch (Throwable t) { isKeepAlive.set(false); heartbeatErrors.incrementAndGet(); @@ -355,6 +296,92 @@ public class TubeBroker implements Stoppable { .append(TubeServerVersion.BROKER_VERSION).toString(); } + private void procConfigFromHeartBeat(StringBuilder sBuilder, + HeartResponseM2B response) { + // process service status + ServiceStatusHolder + .setReadWriteServiceStatus(response.getStopRead(), + response.getStopWrite(), "Master"); + // process flow controller rules + FlowCtrlRuleHandler flowCtrlRuleHandler = + metadataManager.getFlowCtrlRuleHandler(); + long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId(); + int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId(); + if (response.hasFlowCheckId()) { + qryPriorityId = response.hasQryPriorityId() + ? response.getQryPriorityId() : qryPriorityId; + if (response.getFlowCheckId() != flowCheckId) { + flowCheckId = response.getFlowCheckId(); + try { + flowCtrlRuleHandler + .updateDefFlowCtrlInfo(qryPriorityId, + flowCheckId, response.getFlowControlInfo()); + } catch (Exception e1) { + logger.warn( + "[HeartBeat response] found parse flowCtrl rules failure", e1); + } + } + if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { + flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); + } + } + // update configure report requirement + requireReportConf = response.getNeedReportData(); + // update cluster setting + if (response.hasClsConfig()) { + long configId = response.getClsConfig().getConfigId(); + if (configId != ClusterConfigHolder.getConfigId()) { + ClusterConfigHolder.updClusterSetting(response.getClsConfig()); + logger.info(sBuilder + .append("[HeartBeat response] received cluster configure changed,") + .append(",hasClsConfig=").append(response.hasClsConfig()) + .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId()) + .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize()) + .append(",minMemCacheSize=") + .append(ClusterConfigHolder.getMinMemCacheSize()) + .toString()); + sBuilder.delete(0, sBuilder.length()); + } + } + if (response.getTakeConfInfo()) { + logger.info(sBuilder + .append("[HeartBeat response] received broker metadata info: brokerConfId=") + .append(response.getCurBrokerConfId()) + .append(",stopWrite=").append(response.getStopWrite()) + .append(",stopRead=").append(response.getStopRead()) + .append(",configCheckSumId=").append(response.getConfCheckSumId()) + .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) + .append(",curFlowCtrlId=").append(flowCheckId) + .append(",curQryPriorityId=").append(qryPriorityId) + .append(",brokerDefaultConfInfo=") + .append(response.getBrokerDefaultConfInfo()) + .append(",brokerTopicSetConfList=") + .append(response.getBrokerTopicSetConfInfoList().toString()).toString()); + sBuilder.delete(0, sBuilder.length()); + metadataManager + .updateBrokerTopicConfigMap(response.getCurBrokerConfId(), + response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), + response.getBrokerTopicSetConfInfoList(), false, sBuilder); + } + // update auth info + if (response.hasBrokerAuthorizedInfo()) { + serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo()); + } + // process topic deletion + boolean needProcess = + metadataManager.updateBrokerRemoveTopicMap( + response.getTakeRemoveTopicInfo(), + response.getRemoveTopicConfInfoList(), sBuilder); + if (needProcess) { + new Thread() { + @Override + public void run() { + storeManager.removeTopicStore(); + } + }.start(); + } + } + /*** * Register to master. Try multi times if failed. * @@ -374,53 +401,7 @@ public class TubeBroker implements Stoppable { .append("Register to master failed! The error message is ") .append(response.getErrMsg()).toString()); } - ServiceStatusHolder - .setReadWriteServiceStatus(response.getStopRead(), - response.getStopWrite(), "Master"); - FlowCtrlRuleHandler flowCtrlRuleHandler = - metadataManager.getFlowCtrlRuleHandler(); - if (response.hasFlowCheckId()) { - int qryPriorityId = response.hasQryPriorityId() - ? response.getQryPriorityId() : flowCtrlRuleHandler.getQryPriorityId(); - if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) { - try { - flowCtrlRuleHandler - .updateDefFlowCtrlInfo(response.getQryPriorityId(), - response.getFlowCheckId(), response.getFlowControlInfo()); - } catch (Exception e1) { - logger.warn("[Register response] found parse flowCtrl rules failure", e1); - } - } - if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { - flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); - } - } - updateEnableBrokerFunInfo(response); - logger.info(sBuilder - .append("[Register response] received broker metadata info: brokerConfId=") - .append(response.getCurBrokerConfId()) - .append(",stopWrite=").append(response.getStopWrite()) - .append(",stopRead=").append(response.getStopRead()) - .append(",configCheckSumId=").append(response.getConfCheckSumId()) - .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) - .append(",enableVisitTokenCheck=") - .append(serverAuthHandler.isEnableVisitTokenCheck()) - .append(",enableProduceAuthenticate=") - .append(serverAuthHandler.isEnableProduceAuthenticate()) - .append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize()) - .append(",enableConsumeAuthenticate=") - .append(serverAuthHandler.isEnableConsumeAuthenticate()) - .append(",enableConsumeAuthorize=") - .append(serverAuthHandler.isEnableConsumeAuthorize()) - .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId()) - .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId()) - .append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo()) - .append(",brokerTopicSetConfList=") - .append(response.getBrokerTopicSetConfInfoList().toString()).toString()); - sBuilder.delete(0, sBuilder.length()); - metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(), - response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), - response.getBrokerTopicSetConfInfoList(), true, sBuilder); + procConfigFromRegister(sBuilder, response); isKeepAlive.set(true); lastRegTime.set(System.currentTimeMillis()); break; @@ -435,11 +416,80 @@ public class TubeBroker implements Stoppable { } } - private void updateEnableBrokerFunInfo(final RegisterResponseM2B response) { + + private void procConfigFromRegister(StringBuilder sBuilder, + final RegisterResponseM2B response) { + // process service status + ServiceStatusHolder + .setReadWriteServiceStatus(response.getStopRead(), + response.getStopWrite(), "Master"); + // process flow controller rules + FlowCtrlRuleHandler flowCtrlRuleHandler = + metadataManager.getFlowCtrlRuleHandler(); + if (response.hasFlowCheckId()) { + int qryPriorityId = response.hasQryPriorityId() + ? response.getQryPriorityId() : flowCtrlRuleHandler.getQryPriorityId(); + if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) { + try { + flowCtrlRuleHandler + .updateDefFlowCtrlInfo(response.getQryPriorityId(), + response.getFlowCheckId(), response.getFlowControlInfo()); + } catch (Exception e1) { + logger.warn("[Register response] found parse flowCtrl rules failure", e1); + } + } + if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) { + flowCtrlRuleHandler.setQryPriorityId(qryPriorityId); + } + } + // update auth info serverAuthHandler.configure(response.getEnableBrokerInfo()); if (response.hasBrokerAuthorizedInfo()) { serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo()); } + // update cluster setting + if (response.hasClsConfig()) { + long configId = response.getClsConfig().getConfigId(); + if (configId != ClusterConfigHolder.getConfigId()) { + ClusterConfigHolder.updClusterSetting(response.getClsConfig()); + } + } + sBuilder.append("[Register response] received broker metadata info: brokerConfId=") + .append(response.getCurBrokerConfId()) + .append(",stopWrite=").append(response.getStopWrite()) + .append(",stopRead=").append(response.getStopRead()) + .append(",configCheckSumId=").append(response.getConfCheckSumId()) + .append(",hasFlowCtrl=").append(response.hasFlowCheckId()) + .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId()) + .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId()) + .append(",hasClsConfig=").append(response.hasClsConfig()) + .append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId()) + .append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize()) + .append(",minMemCacheSize=").append(ClusterConfigHolder.getMinMemCacheSize()) + .append(",enableVisitTokenCheck=") + .append(serverAuthHandler.isEnableVisitTokenCheck()) + .append(",enableProduceAuthenticate=") + .append(serverAuthHandler.isEnableProduceAuthenticate()) + .append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize()) + .append(",enableConsumeAuthenticate=") + .append(serverAuthHandler.isEnableConsumeAuthenticate()) + .append(",enableConsumeAuthorize=") + .append(serverAuthHandler.isEnableConsumeAuthorize()) + .append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo()) + .append(",brokerTopicSetConfList=") + .append(response.getBrokerTopicSetConfInfoList().toString()).toString(); + sBuilder.delete(0, sBuilder.length()); + metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(), + response.getConfCheckSumId(), response.getBrokerDefaultConfInfo(), + response.getBrokerTopicSetConfInfoList(), true, sBuilder); + } + + // build cluster configure info + private ClientMaster.ClusterConfig.Builder buildClusterConfig() { + ClientMaster.ClusterConfig.Builder defSetting = + ClientMaster.ClusterConfig.newBuilder(); + defSetting.setConfigId(ClusterConfigHolder.getConfigId()); + return defSetting; } /*** @@ -474,6 +524,7 @@ public class TubeBroker implements Stoppable { if (authInfoBuilder != null) { builder.setAuthInfo(authInfoBuilder.build()); } + builder.setClsConfig(buildClusterConfig()); logger.info(new StringBuilder(512) .append("[Register request] current broker report info: brokerConfId=") .append(metadataManager.getBrokerMetadataConfId()) @@ -517,6 +568,7 @@ public class TubeBroker implements Stoppable { if (authInfoBuilder != null) { builder.setAuthInfo(authInfoBuilder.build()); } + builder.setClsConfig(buildClusterConfig()); if (metadataManager.isBrokerMetadataChanged() || requireReportConf) { builder.setTakeConfInfo(true); builder.setBrokerDefaultConfInfo(metadataManager.getBrokerDefMetaConfInfo()); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java new file mode 100644 index 0000000..c72427d --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.broker.metadata; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.corebase.protobuf.generated.ClientMaster; +import org.apache.tubemq.corebase.utils.MixedUtils; +import org.apache.tubemq.server.broker.utils.DataStoreUtils; + + +public class ClusterConfigHolder { + private static AtomicLong configId = + new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); + private static AtomicInteger maxMsgSize = + new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE); + private static AtomicInteger minMemCacheSize = + new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE); + private static AtomicInteger maxMsgStoreLength = + new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE + + DataStoreUtils.STORE_DATA_HEADER_LEN); + + public ClusterConfigHolder() { + + } + + // set master returned configure + public static void updClusterSetting(ClientMaster.ClusterConfig clusterConfig) { + if (clusterConfig == null) { + return; + } + if (configId.get() != clusterConfig.getConfigId()) { + configId.set(clusterConfig.getConfigId()); + if (clusterConfig.hasMaxMsgSize()) { + int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(), + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT) + + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE; + if (tmpMaxSize != maxMsgSize.get()) { + maxMsgSize.set(tmpMaxSize); + minMemCacheSize.set(tmpMaxSize + + (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST); + maxMsgStoreLength.set(tmpMaxSize + DataStoreUtils.STORE_DATA_HEADER_LEN); + } + } + } + } + + public static long getConfigId() { + return configId.get(); + } + + public static int getMaxMsgSize() { + return maxMsgSize.get(); + } + + public static int getMinMemCacheSize() { + return minMemCacheSize.get(); + } + + public static int getMaxMsgStoreLength() { + return maxMsgStoreLength.get(); + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java index e610a8b..8f650b4 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java @@ -37,6 +37,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants; import org.apache.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.tubemq.corebase.utils.ThreadUtils; import org.apache.tubemq.server.broker.BrokerConfig; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.metadata.TopicMetadata; import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult; import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStatisInfo; @@ -129,7 +130,7 @@ public class MessageStore implements Closeable { this.unflushThreshold.set(topicMetadata.getUnflushThreshold()); this.unflushDataHold.set(topicMetadata.getUnflushDataHold()); this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt(); - this.writeCacheMaxSize = topicMetadata.getMemCacheMsgSize(); + this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize()); this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000 @@ -421,7 +422,7 @@ public class MessageStore implements Closeable { writeCacheMutex.readLock().lock(); try { writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt(); - writeCacheMaxSize = topicMetadata.getMemCacheMsgSize(); + writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize()); writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); } finally { writeCacheMutex.readLock().unlock(); @@ -603,6 +604,17 @@ public class MessageStore implements Closeable { } } + private int validAndGetMemCacheSize(int memCacheSize) { + if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) { + logger.info(new StringBuilder(512) + .append("[Data Store] writeCacheMaxSize changed, from ") + .append(memCacheSize).append(" to ") + .append(ClusterConfigHolder.getMinMemCacheSize()).toString()); + memCacheSize = ClusterConfigHolder.getMinMemCacheSize(); + } + return memCacheSize; + } + /*** * Append message and trigger flush operation. * diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java index 949d149..6fda352 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java @@ -25,6 +25,7 @@ import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.tubemq.corebase.utils.CheckSum; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -365,7 +366,7 @@ public class FileSegment implements Segment { itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN + itemMsglen; if ((itemMsgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE) || (itemMsglen <= 0) - || (itemMsglen > DataStoreUtils.MAX_MSG_DATA_STORE_SIZE) + || (itemMsglen > ClusterConfigHolder.getMaxMsgSize()) || (itemNext > totalBytes)) { next = -1; break; @@ -437,7 +438,7 @@ public class FileSegment implements Segment { if ((itemMsgPartId < 0) || (itemMsgOffset < 0) || (itemMsglen <= 0) - || (itemMsglen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) + || (itemMsglen > ClusterConfigHolder.getMaxMsgStoreLength()) || (itemNext > totalBytes)) { next = -1; break; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java index 400b666..fe23dd3 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java @@ -35,6 +35,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants; import org.apache.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.tubemq.corebase.utils.ServiceStatusHolder; import org.apache.tubemq.server.broker.BrokerConfig; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.stats.CountItem; import org.apache.tubemq.server.broker.utils.DataStoreUtils; @@ -273,7 +274,7 @@ public class MsgFileStore implements Closeable { // skip when mismatch condition if (curIndexDataOffset < 0 || curIndexDataSize <= 0 - || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN + || curIndexDataSize > ClusterConfigHolder.getMaxMsgStoreLength() || curIndexDataOffset < curDataMinOffset) { readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN; continue; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java index 6da3d27..27eef32 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TErrCodeConstants; import org.apache.tubemq.server.broker.BrokerConfig; +import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStore; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.apache.tubemq.server.common.utils.AppendResult; @@ -224,7 +225,7 @@ public class MsgMemStore implements Closeable { if ((cDataOffset < 0) || (cDataSize <= 0) || (cDataOffset >= currDataOffset) - || (cDataSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024) + || (cDataSize > ClusterConfigHolder.getMaxMsgSize()) || (cDataOffset + cDataSize > currDataOffset)) { readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN; continue; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java index 05688a7..2ed75c3 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java @@ -88,8 +88,8 @@ public enum WebFieldDef { ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING, "Admin api operation authorization code", TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH), - MAXMSGSIZE(24, "maxMsgSize", "maxMsgSize", WebFieldType.INT, - "Max allowed message size", RegexDef.TMP_NUMBER), + MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT, + "Max allowed message size, unit MB", RegexDef.TMP_NUMBER), CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING, "Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH), MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING, diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java index aa87940..ed2e122 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java @@ -32,6 +32,8 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.broker.metadata.MetadataManager; import org.apache.tubemq.server.broker.metadata.TopicMetadata; +import org.apache.tubemq.server.common.fielddef.WebFieldDef; +import org.apache.tubemq.server.common.utils.ProcessResult; import org.apache.tubemq.server.master.MasterConfig; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager; @@ -576,4 +578,35 @@ public class PBParameterUtils { retResult.setCheckData(tmpValue); return retResult; } + + /** + * Check the string parameter + * + * @param fieldDef the field to be checked + * @param paramValue the field value to be checked + * @param strBuffer the string pool construct the result + * @param result the checked result + * @return result success or failure + */ + public static boolean getStringParameter(WebFieldDef fieldDef, + String paramValue, + StringBuilder strBuffer, + ProcessResult result) { + if (TStringUtils.isBlank(paramValue)) { + result.setFailResult(strBuffer.append("Request miss necessary ") + .append(fieldDef.name).append(" data!").toString()); + strBuffer.delete(0, strBuffer.length()); + return result.success; + } + String tmpValue = paramValue.trim(); + if (tmpValue.length() > fieldDef.valMaxLen) { + result.setFailResult(strBuffer.append(fieldDef.name) + .append("'s length over max value, allowed max length is ") + .append(fieldDef.valMaxLen).toString()); + strBuffer.delete(0, strBuffer.length()); + return result.success; + } + result.setSuccResult(tmpValue); + return result.success; + } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java index 385fe22..d4b1e20 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java @@ -297,37 +297,11 @@ public class WebParameterUtils { WebFieldDef fieldDef, boolean required, ProcessResult result) { - if (!getStringParamValue(req, fieldDef, - required, null, result)) { - return result.success; - } - Set<Integer> tgtValueSet = new HashSet<Integer>(); - if (fieldDef.isCompFieldType()) { - Set<String> valItemSet = (Set<String>) result.retData1; - if (valItemSet.isEmpty()) { - result.setSuccResult(tgtValueSet); - return result.success; - } - for (String itemVal : valItemSet) { - if (!checkIntValueNorms(fieldDef, - itemVal, false, -1, result)) { - return result.success; - } - tgtValueSet.add((Integer) result.retData1); - } - } else { - String paramValue = (String) result.retData1; - if (paramValue == null) { - result.setSuccResult(tgtValueSet); - return result.success; - } - if (!checkIntValueNorms(fieldDef, - paramValue, false, -1, result)) { - tgtValueSet.add((Integer) result.retData1); - } - } - result.setSuccResult(tgtValueSet); - return result.success; + return getIntParamValue(req, fieldDef, required, + false, TBaseConstants.META_VALUE_UNDEFINED, + false, TBaseConstants.META_VALUE_UNDEFINED, + false, TBaseConstants.META_VALUE_UNDEFINED, + result); } /** @@ -342,11 +316,48 @@ public class WebParameterUtils { * @return process result */ public static boolean getIntParamValue(HttpServletRequest req, - WebFieldDef fieldDef, - boolean required, - int defValue, - int minValue, - ProcessResult result) { + WebFieldDef fieldDef, + boolean required, + int defValue, + int minValue, + ProcessResult result) { + return getIntParamValue(req, fieldDef, required, true, defValue, + true, minValue, false, TBaseConstants.META_VALUE_UNDEFINED, result); + } + + /** + * Parse the parameter value from an object value to a integer value + * + * @param req Http Servlet Request + * @param fieldDef the parameter field definition + * @param required a boolean value represent whether the parameter is must required + * @param defValue a default value returned if the field not exist + * @param minValue min value required + * @param minValue max value allowed + * @param result process result of parameter value + * @return process result + */ + public static boolean getIntParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + int defValue, + int minValue, + int maxValue, + ProcessResult result) { + return getIntParamValue(req, fieldDef, required, true, defValue, + true, minValue, true, maxValue, result); + } + + private static boolean getIntParamValue(HttpServletRequest req, + WebFieldDef fieldDef, + boolean required, + boolean hasDefVal, + int defValue, + boolean hasMinVal, + int minValue, + boolean hasMaxVal, + int maxValue, + ProcessResult result) { if (!getStringParamValue(req, fieldDef, required, null, result)) { return result.success; } @@ -354,13 +365,15 @@ public class WebParameterUtils { Set<Integer> tgtValueSet = new HashSet<Integer>(); Set<String> valItemSet = (Set<String>) result.retData1; if (valItemSet.isEmpty()) { - tgtValueSet.add(defValue); + if (hasDefVal) { + tgtValueSet.add(defValue); + } result.setSuccResult(tgtValueSet); return result.success; } for (String itemVal : valItemSet) { - if (!checkIntValueNorms(fieldDef, - itemVal, true, minValue, result)) { + if (!checkIntValueNorms(fieldDef, itemVal, + hasMinVal, minValue, hasMaxVal, maxValue, result)) { return result.success; } tgtValueSet.add((Integer) result.retData1); @@ -369,11 +382,13 @@ public class WebParameterUtils { } else { String paramValue = (String) result.retData1; if (paramValue == null) { - result.setSuccResult(defValue); + if (hasDefVal) { + result.setSuccResult(defValue); + } return result.success; } - checkIntValueNorms(fieldDef, - paramValue, true, minValue, result); + checkIntValueNorms(fieldDef, paramValue, + hasMinVal, minValue, hasMinVal, maxValue, result); } return result.success; } @@ -691,6 +706,8 @@ public class WebParameterUtils { * @param paramValue the parameter value * @param hasMinVal whether there is a minimum * param minValue the parameter min value + * @param hasMaxVal whether there is a maximum + * param maxValue the parameter max value * @param result process result * @return check result for string value of parameter */ @@ -698,6 +715,8 @@ public class WebParameterUtils { String paramValue, boolean hasMinVal, int minValue, + boolean hasMaxVal, + int maxValue, ProcessResult result) { try { int paramIntVal = Integer.parseInt(paramValue); @@ -707,6 +726,12 @@ public class WebParameterUtils { .append(" value must >= ").append(minValue).toString()); return false; } + if (hasMaxVal && paramIntVal > maxValue) { + result.setFailResult(new StringBuilder(512) + .append("Parameter ").append(fieldDef.name) + .append(" value must <= ").append(maxValue).toString()); + return false; + } result.setSuccResult(paramIntVal); } catch (Throwable e) { result.setFailResult(new StringBuilder(512) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java index 4299bc7..e7327cb 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java @@ -48,6 +48,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo; import org.apache.tubemq.corebase.cluster.SubscribeInfo; import org.apache.tubemq.corebase.cluster.TopicInfo; import org.apache.tubemq.corebase.config.TLSConfig; +import org.apache.tubemq.corebase.protobuf.generated.ClientMaster; import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M; import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestC2M; import org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestP2M; @@ -101,6 +102,7 @@ import org.apache.tubemq.server.master.balance.DefaultLoadBalancer; import org.apache.tubemq.server.master.balance.LoadBalancer; import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager; import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder; @@ -348,6 +350,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setBrokerCheckSum(this.defaultBrokerConfManager.getBrokerInfoCheckSum()); builder.addAllBrokerInfos(this.defaultBrokerConfManager.getBrokersMap(overtls).values()); builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false).build()); + ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder = + buildApprovedClientConfig(request.getAppdConfig()); + if (clientConfigBuilder != null) { + builder.setAppdConfig(clientConfigBuilder); + } logger.info(strBuffer.append("[Producer Register] ").append(producerId) .append(", isOverTLS=").append(overtls) .append(", clientJDKVer=").append(clientJdkVer).toString()); @@ -436,6 +443,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (defaultBrokerConfManager.getBrokerInfoCheckSum() != inBrokerCheckSum) { builder.addAllBrokerInfos(defaultBrokerConfManager.getBrokersMap(overtls).values()); } + ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder = + buildApprovedClientConfig(request.getAppdConfig()); + if (clientConfigBuilder != null) { + builder.setAppdConfig(clientConfigBuilder); + } if (logger.isDebugEnabled()) { logger.debug(strBuffer.append("[Push Producer's available topic count:]") .append(producerId).append(TokenConstants.LOG_SEG_SEP) @@ -1071,6 +1083,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo()); builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo()); builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED); + ClientMaster.ClusterConfig.Builder clusterConfigBuilder = + buildClusterConfig(request.getClsConfig()); + if (clusterConfigBuilder != null) { + builder.setClsConfig(clusterConfigBuilder); + } if (request.hasFlowCheckId()) { BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity = defaultBrokerConfManager.getBdbDefFlowCtrl(); @@ -1259,6 +1276,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), builder); + ClientMaster.ClusterConfig.Builder clusterConfigBuilder = + buildClusterConfig(request.getClsConfig()); + if (clusterConfigBuilder != null) { + builder.setClsConfig(clusterConfigBuilder); + } builder.setTakeRemoveTopicInfo(true); builder.addAllRemoveTopicConfInfo(defaultBrokerConfManager .getBrokerRemovedTopicStrConfigInfo(bdbBrokerConfEntity)); @@ -2295,6 +2317,56 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } /** + * build approved client configure + * + * @param inClientConfig client reported Configure info + * @return ApprovedClientConfig + */ + private ClientMaster.ApprovedClientConfig.Builder buildApprovedClientConfig( + ClientMaster.ApprovedClientConfig inClientConfig) { + ClientMaster.ApprovedClientConfig.Builder outClientConfig = null; + if (inClientConfig != null) { + outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder(); + BdbClusterSettingEntity settingEntity = + this.defaultBrokerConfManager.getBdbClusterSetting(); + if (settingEntity == null) { + outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED); + } else { + outClientConfig.setConfigId(settingEntity.getConfigId()); + if (settingEntity.getConfigId() != inClientConfig.getConfigId()) { + outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB()); + } + } + } + return outClientConfig; + } + + + /** + * build cluster configure info + * + * @param inClusterConfig broker reported Configure info + * @return ClusterConfig + */ + private ClientMaster.ClusterConfig.Builder buildClusterConfig( + ClientMaster.ClusterConfig inClusterConfig) { + ClientMaster.ClusterConfig.Builder outClsConfig = null; + if (inClusterConfig != null) { + outClsConfig = ClientMaster.ClusterConfig.newBuilder(); + BdbClusterSettingEntity settingEntity = + this.defaultBrokerConfManager.getBdbClusterSetting(); + if (settingEntity == null) { + outClsConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED); + } else { + outClsConfig.setConfigId(settingEntity.getConfigId()); + if (settingEntity.getConfigId() != inClusterConfig.getConfigId()) { + outClsConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB()); + } + } + } + return outClsConfig; + } + /** * Start balance chore * * @param master diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java index 588fd87..7b9b570 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java @@ -37,6 +37,7 @@ public class BdbClusterSettingEntity implements Serializable { @PrimaryKey private String recordKey = ""; + private long configId = TBaseConstants.META_VALUE_UNDEFINED; //broker tcp port private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED; //broker tls port @@ -48,20 +49,22 @@ public class BdbClusterSettingEntity implements Serializable { //partition num private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED; //flush disk threshold - private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED; + private int unflushThreshold = TBaseConstants.META_VALUE_UNDEFINED; //flush disk interval - private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED; - //flush memory cache threshold - private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED; - //flush memory cache interval - private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED; + private int unflushInterval = TBaseConstants.META_VALUE_UNDEFINED; + //flush disk data count + private int unflushDataHold = TBaseConstants.META_VALUE_UNDEFINED; //flush memory cache count - private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED; + private int memCacheMsgCntInK = TBaseConstants.META_VALUE_UNDEFINED; + //flush memory cache interval + private int memCacheFlushIntvl = TBaseConstants.META_VALUE_UNDEFINED; + //flush memory cache size + private int memCacheMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED; private boolean acceptPublish = true; //enable publish private boolean acceptSubscribe = true; //enable subscribe - private String deleteWhen = ""; //delete policy execute time + private String deletePolicy = ""; //delete policy execute time private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED; - private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED; + private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED; private String attributes = ""; //extra attribute private String modifyUser; //modify user private Date modifyDate; //modify date @@ -70,30 +73,34 @@ public class BdbClusterSettingEntity implements Serializable { } //Constructor - public BdbClusterSettingEntity(String recordKey, int brokerPort, int brokerTLSPort, - int brokerWebPort, int numTopicStores, int numPartitions, - int unflushDskThreshold, int unflushDksInterval, - int unflushMemThreshold, int unflushMemInterval, - int unflushMemCnt, boolean acceptPublish, - boolean acceptSubscribe, String deleteWhen, - int qryPriorityId, int maxMsgSize, String attributes, + public BdbClusterSettingEntity(String recordKey, long configId, int brokerPort, + int brokerTLSPort, int brokerWebPort, + int numTopicStores, int numPartitions, + int unflushThreshold, int unflushInterval, + int unflushDataHold, int memCacheMsgCntInK, + int memCacheFlushIntvl, int memCacheMsgSizeInMB, + boolean acceptPublish, boolean acceptSubscribe, + String deletePolicy, int qryPriorityId, + int maxMsgSizeInB, String attributes, String modifyUser, Date modifyDate) { this.recordKey = recordKey; + this.configId = configId; this.brokerPort = brokerPort; this.brokerTLSPort = brokerTLSPort; this.brokerWebPort = brokerWebPort; this.numTopicStores = numTopicStores; this.numPartitions = numPartitions; - this.unflushDskThreshold = unflushDskThreshold; - this.unflushDksInterval = unflushDksInterval; - this.unflushMemThreshold = unflushMemThreshold; - this.unflushMemInterval = unflushMemInterval; - this.unflushMemCnt = unflushMemCnt; + this.unflushThreshold = unflushThreshold; + this.unflushInterval = unflushInterval; + this.unflushDataHold = unflushDataHold; + this.memCacheMsgCntInK = memCacheMsgCntInK; + this.memCacheFlushIntvl = memCacheFlushIntvl; + this.memCacheMsgSizeInMB = memCacheMsgSizeInMB; this.acceptPublish = acceptPublish; this.acceptSubscribe = acceptSubscribe; - this.deleteWhen = deleteWhen; + this.deletePolicy = deletePolicy; this.qryPriorityId = qryPriorityId; - this.maxMsgSize = maxMsgSize; + this.maxMsgSizeInB = maxMsgSizeInB; this.attributes = attributes; this.modifyUser = modifyUser; this.modifyDate = modifyDate; @@ -107,6 +114,10 @@ public class BdbClusterSettingEntity implements Serializable { return recordKey; } + public long getConfigId() { + return configId; + } + public int getBrokerPort() { return brokerPort; } @@ -147,44 +158,52 @@ public class BdbClusterSettingEntity implements Serializable { this.numPartitions = numPartitions; } - public int getUnflushDskThreshold() { - return unflushDskThreshold; + public int getUnflushThreshold() { + return unflushThreshold; + } + + public void setUnflushThreshold(int unflushThreshold) { + this.unflushThreshold = unflushThreshold; + } + + public int getUnflushInterval() { + return unflushInterval; } - public void setUnflushDskThreshold(int unflushDskThreshold) { - this.unflushDskThreshold = unflushDskThreshold; + public void setUnflushInterval(int unflushInterval) { + this.unflushInterval = unflushInterval; } - public int getUnflushDksInterval() { - return unflushDksInterval; + public int getUnflushDataHold() { + return unflushDataHold; } - public void setUnflushDksInterval(int unflushDksInterval) { - this.unflushDksInterval = unflushDksInterval; + public void setUnflushDataHold(int unflushDataHold) { + this.unflushDataHold = unflushDataHold; } - public int getUnflushMemThreshold() { - return unflushMemThreshold; + public int getMemCacheMsgCntInK() { + return memCacheMsgCntInK; } - public void setUnflushMemThreshold(int unflushMemThreshold) { - this.unflushMemThreshold = unflushMemThreshold; + public void setMemCacheMsgCntInK(int memCacheMsgCntInK) { + this.memCacheMsgCntInK = memCacheMsgCntInK; } - public int getUnflushMemInterval() { - return unflushMemInterval; + public int getMemCacheFlushIntvl() { + return memCacheFlushIntvl; } - public void setUnflushMemInterval(int unflushMemInterval) { - this.unflushMemInterval = unflushMemInterval; + public void setMemCacheFlushIntvl(int memCacheFlushIntvl) { + this.memCacheFlushIntvl = memCacheFlushIntvl; } - public int getUnflushMemCnt() { - return unflushMemCnt; + public int getMemCacheMsgSizeInMB() { + return memCacheMsgSizeInMB; } - public void setUnflushMemCnt(int unflushMemCnt) { - this.unflushMemCnt = unflushMemCnt; + public void setMemCacheMsgSizeInMB(int memCacheMsgSizeInMB) { + this.memCacheMsgSizeInMB = memCacheMsgSizeInMB; } public boolean isAcceptPublish() { @@ -203,12 +222,12 @@ public class BdbClusterSettingEntity implements Serializable { this.acceptSubscribe = acceptSubscribe; } - public String getDeleteWhen() { - return deleteWhen; + public String getDeletePolicy() { + return deletePolicy; } - public void setDeleteWhen(String deleteWhen) { - this.deleteWhen = deleteWhen; + public void setDeletePolicy(String deletePolicy) { + this.deletePolicy = deletePolicy; } public int getQryPriorityId() { @@ -219,12 +238,12 @@ public class BdbClusterSettingEntity implements Serializable { this.qryPriorityId = qryPriorityId; } - public int getMaxMsgSize() { - return maxMsgSize; + public int getMaxMsgSizeInB() { + return maxMsgSizeInB; } - public void setMaxMsgSize(int maxMsgSize) { - this.maxMsgSize = maxMsgSize; + public void setMaxMsgSizeInB(int maxMsgSizeInB) { + this.maxMsgSizeInB = maxMsgSizeInB; } public String getAttributes() { @@ -236,6 +255,7 @@ public class BdbClusterSettingEntity implements Serializable { } public void setModifyInfo(String modifyUser, Date modifyDate) { + this.configId = System.currentTimeMillis(); this.modifyUser = modifyUser; this.modifyDate = modifyDate; } @@ -255,23 +275,30 @@ public class BdbClusterSettingEntity implements Serializable { * @return */ public StringBuilder toJsonString(final StringBuilder sBuilder) { - return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",") + sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",") .append("\"recordKey\":\"").append(recordKey).append("\"") + .append(",\"configId\":").append(configId) .append(",\"brokerPort\":").append(brokerPort) .append(",\"brokerTLSPort\":").append(brokerTLSPort) .append(",\"brokerWebPort\":").append(brokerWebPort) .append(",\"numTopicStores\":").append(numTopicStores) .append(",\"numPartitions\":").append(numPartitions) - .append(",\"unflushDskThreshold\":").append(unflushDskThreshold) - .append(",\"unflushDksInterval\":").append(unflushDksInterval) - .append(",\"unflushMemThreshold\":").append(unflushMemThreshold) - .append(",\"unflushMemInterval\":").append(unflushMemInterval) - .append(",\"unflushMemCnt\":").append(unflushMemCnt) + .append(",\"unflushThreshold\":").append(unflushThreshold) + .append(",\"unflushInterval\":").append(unflushInterval) + .append(",\"unflushDataHold\":").append(unflushDataHold) + .append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK) + .append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl) + .append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB) .append(",\"acceptPublish\":").append(acceptPublish) .append(",\"acceptSubscribe\":").append(acceptSubscribe) - .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"") - .append(",\"maxMsgSize\":").append(maxMsgSize) - .append(",\"qryPriorityId\":").append(qryPriorityId) + .append(",\"deletePolicy\":\"").append(deletePolicy).append("\"") + .append(",\"maxMsgSizeInMB\":"); + if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) { + sBuilder.append(maxMsgSizeInB); + } else { + sBuilder.append(maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE); + } + return sBuilder.append(",\"qryPriorityId\":").append(qryPriorityId) .append(",\"attributes\":\"").append(attributes).append("\"") .append(",\"modifyUser\":\"").append(modifyUser).append("\"") .append(",\"modifyDate\":\"") @@ -281,23 +308,30 @@ public class BdbClusterSettingEntity implements Serializable { @Override public String toString() { - return new ToStringBuilder(this) + ToStringBuilder sBuilder = new ToStringBuilder(this) .append("recordKey", recordKey) + .append("configId", configId) .append("brokerPort", brokerPort) .append("brokerTLSPort", brokerTLSPort) .append("brokerWebPort", brokerWebPort) .append("numTopicStores", numTopicStores) .append("numPartitions", numPartitions) - .append("unflushDskThreshold", unflushDskThreshold) - .append("unflushDksInterval", unflushDksInterval) - .append("unflushMemThreshold", unflushMemThreshold) - .append("unflushMemInterval", unflushMemInterval) - .append("unflushMemCnt", unflushMemCnt) + .append("unflushThreshold", unflushThreshold) + .append("unflushInterval", unflushInterval) + .append("unflushDataHold", unflushDataHold) + .append("memCacheMsgCntInK", memCacheMsgCntInK) + .append("memCacheFlushIntvl", memCacheFlushIntvl) + .append("memCacheMsgSizeInMB", memCacheMsgSizeInMB) .append("acceptPublish", acceptPublish) .append("acceptSubscribe", acceptSubscribe) - .append("deleteWhen", deleteWhen) - .append("maxMsgSize", maxMsgSize) - .append("qryPriorityId", qryPriorityId) + .append("deletePolicy", deletePolicy); + if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) { + sBuilder.append("maxMsgSizeInMB", maxMsgSizeInB); + } else { + sBuilder.append("maxMsgSizeInMB", + maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE); + } + return sBuilder.append("qryPriorityId", qryPriorityId) .append("attributes", attributes) .append("modifyUser", modifyUser) .append("modifyDate", modifyDate) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java index 9afe1aa..f2fece8 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java @@ -169,13 +169,14 @@ public class WebMasterInfoHandler extends AbstractWebHandler { if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.MAXMSGSIZE, false, TBaseConstants.META_VALUE_UNDEFINED, - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE, + TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB, + TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB, result)) { WebParameterUtils.buildFailResult(sBuilder, result.errInfo); return sBuilder; } - int maxMsgSize = (int) result.retData1; - if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) { + int maxMsgSizeInMB = (int) result.retData1; + if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) { dataChanged = true; } // check and get modify date @@ -196,9 +197,9 @@ public class WebMasterInfoHandler extends AbstractWebHandler { defClusterSetting = new BdbClusterSettingEntity(); } defClusterSetting.setModifyInfo(modifyUser, modifyDate); - if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) { - defClusterSetting.setMaxMsgSize( - SettingValidUtils.validAndGetMaxMsgSize(maxMsgSize)); + if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) { + defClusterSetting.setMaxMsgSizeInB( + SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB)); } try { brokerConfManager.confSetBdbClusterDefSetting(defClusterSetting);
