This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new c13700c [TUBEMQ-501] Adjust max message size check logic
c13700c is described below
commit c13700c4ffca54e55de62894b971a27f68a563a1
Author: gosonzhang <[email protected]>
AuthorDate: Mon Jan 11 16:18:46 2021 +0800
[TUBEMQ-501] Adjust max message size check logic
---
.../tubemq/client/producer/AllowedSetting.java | 4 +-
.../tubemq/client/producer/ProducerManager.java | 48 +++-
.../client/producer/SimpleMessageProducer.java | 25 +-
.../org/apache/tubemq/corebase/TBaseConstants.java | 18 +-
.../apache/tubemq/corebase/utils/MixedUtils.java | 10 +
.../tubemq/corebase/utils/SettingValidUtils.java | 19 +-
.../org/apache/tubemq/corerpc/RpcConstants.java | 6 +-
tubemq-core/src/main/proto/MasterService.proto | 10 +-
.../tubemq/server/broker/BrokerServiceServer.java | 5 +-
.../apache/tubemq/server/broker/TubeBroker.java | 270 ++++++++++++---------
.../broker/metadata/ClusterConfigHolder.java | 82 +++++++
.../server/broker/msgstore/MessageStore.java | 16 +-
.../server/broker/msgstore/disk/FileSegment.java | 5 +-
.../server/broker/msgstore/disk/MsgFileStore.java | 3 +-
.../server/broker/msgstore/mem/MsgMemStore.java | 3 +-
.../tubemq/server/common/fielddef/WebFieldDef.java | 4 +-
.../server/common/paramcheck/PBParameterUtils.java | 33 +++
.../server/common/utils/WebParameterUtils.java | 109 +++++----
.../org/apache/tubemq/server/master/TMaster.java | 72 ++++++
.../bdbentitys/BdbClusterSettingEntity.java | 172 +++++++------
.../master/web/handler/WebMasterInfoHandler.java | 13 +-
21 files changed, 650 insertions(+), 277 deletions(-)
diff --git
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
index cabf928..7beb4e0 100644
---
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
+++
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/AllowedSetting.java
@@ -45,8 +45,8 @@ public class AllowedSetting {
}
if (allowedConfig.hasMaxMsgSize()
&& allowedConfig.getMaxMsgSize() != maxMsgSize.get()) {
- maxMsgSize.set(
-
SettingValidUtils.validAndGetMaxMsgSize(allowedConfig.getMaxMsgSize()));
+ maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
+ allowedConfig.getMaxMsgSize()));
}
}
}
diff --git
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index ea1fd51..07586ec 100644
---
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -79,6 +79,8 @@ public class ProducerManager {
private final ScheduledExecutorService heartbeatService;
private final AtomicLong visitToken =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final AllowedSetting allowedSetting =
+ new AllowedSetting();
private final AtomicReference<String> authAuthorizedTokenRef =
new AtomicReference<>("");
private final ClientAuthenticateHandler authenticateHandler =
@@ -311,6 +313,15 @@ public class ProducerManager {
}
/**
+ * Get allowed message size.
+ *
+ * @return max allowed message size
+ */
+ public int getMaxMsgSize() {
+ return allowedSetting.getMaxMsgSize();
+ }
+
+ /**
* Check if the producer manager is shutdown.
*
* @return producer status
@@ -396,7 +407,7 @@ public class ProducerManager {
updateBrokerInfoList(true,
response.getBrokerInfosList(),
response.getBrokerCheckSum(), sBuilder);
}
- processRegAuthorizedToken(response);
+ processRegSyncInfo(response);
return;
}
if (remainingRetry <= 0) {
@@ -436,6 +447,7 @@ public class ProducerManager {
if (authInfoBuilder != null) {
builder.setAuthInfo(authInfoBuilder.build());
}
+ builder.setAppdConfig(buildAllowedConfig4P());
return builder.build();
}
@@ -455,6 +467,7 @@ public class ProducerManager {
if (authInfoBuilder != null) {
builder.setAuthInfo(authInfoBuilder.build());
}
+ builder.setAppdConfig(buildAllowedConfig4P());
return builder.build();
}
@@ -544,13 +557,22 @@ public class ProducerManager {
}
}
- private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2P
response) {
+ private void processRegSyncInfo(ClientMaster.RegisterResponseM2P response)
{
if (response.hasAuthorizedInfo()) {
processAuthorizedToken(response.getAuthorizedInfo());
}
+ if (response.hasAppdConfig()) {
+ procAllowedConfig4P(response.getAppdConfig());
+ }
}
- private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2P
response) {
+ private void processHeartBeatSyncInfo(ClientMaster.HeartResponseM2P
response) {
+ if (response.hasRequireAuth()) {
+ nextWithAuthInfo2M.set(response.getRequireAuth());
+ }
+ if (response.hasAppdConfig()) {
+ procAllowedConfig4P(response.getAppdConfig());
+ }
if (response.hasAuthorizedInfo()) {
processAuthorizedToken(response.getAuthorizedInfo());
}
@@ -595,6 +617,21 @@ public class ProducerManager {
return authInfoBuilder;
}
+ // build allowed configure info
+ private ClientMaster.ApprovedClientConfig.Builder buildAllowedConfig4P() {
+ ClientMaster.ApprovedClientConfig.Builder appdConfig =
+ ClientMaster.ApprovedClientConfig.newBuilder();
+ appdConfig.setConfigId(allowedSetting.getConfigId());
+ return appdConfig;
+ }
+
+ // set allowed configure info
+ private void procAllowedConfig4P(ClientMaster.ApprovedClientConfig
allowedConfig) {
+ if (allowedConfig != null) {
+ allowedSetting.updAllowedSetting(allowedConfig);
+ }
+ }
+
// #lizard forgives
private class ProducerHeartbeatTask implements Runnable {
@Override
@@ -633,10 +670,7 @@ public class ProducerManager {
}
return;
}
- if (response.hasRequireAuth()) {
- nextWithAuthInfo2M.set(response.getRequireAuth());
- }
- processHeartBeatAuthorizedToken(response);
+ processHeartBeatSyncInfo(response);
if (response.getErrCode() == TErrCodeConstants.NOT_READY) {
lastHeartbeatTime = System.currentTimeMillis();
return;
diff --git
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
index 7d6894d..946f03e 100644
---
a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
+++
b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
@@ -288,19 +288,6 @@ public class SimpleMessageProducer implements
MessageProducer {
|| (message.getData().length == 0)) {
throw new TubeClientException("Illegal parameter: null data in
message package!");
}
- int msgSize = TStringUtils.isBlank(message.getAttribute())
- ? message.getData().length : (message.getData().length +
message.getAttribute().length());
- if (msgSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE) {
- throw new TubeClientException(new StringBuilder(512)
- .append("Illegal parameter: over max message length for
the total size of")
- .append(" message data and attribute, allowed size is ")
- .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE)
- .append(", message's real size is
").append(msgSize).toString());
- }
- if (isShutDown.get()) {
- throw new TubeClientException("Status error: producer has been
shutdown!");
- }
-
if (this.publishTopicMap.get(message.getTopic()) == null) {
throw new TubeClientException(new StringBuilder(512)
.append("Topic ").append(message.getTopic())
@@ -311,6 +298,18 @@ public class SimpleMessageProducer implements
MessageProducer {
.append("Topic ").append(message.getTopic())
.append(" not publish, make sure the topic exist or
acceptPublish and try later!").toString());
}
+ int msgSize = TStringUtils.isBlank(message.getAttribute())
+ ? message.getData().length : (message.getData().length +
message.getAttribute().length());
+ if (msgSize > producerManager.getMaxMsgSize()) {
+ throw new TubeClientException(new StringBuilder(512)
+ .append("Illegal parameter: over max message length for
the total size of")
+ .append(" message data and attribute, allowed size is ")
+ .append(producerManager.getMaxMsgSize())
+ .append(", message's real size is
").append(msgSize).toString());
+ }
+ if (isShutDown.get()) {
+ throw new TubeClientException("Status error: producer has been
shutdown!");
+ }
}
private ClientBroker.SendMessageRequestP2B
createSendMessageRequest(Partition partition,
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
index d91b083..5238323 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
@@ -29,9 +29,6 @@ public class TBaseConstants {
public static final String META_DEFAULT_CHARSET_NAME = "UTF-8";
public static final int META_MAX_MSGTYPE_LENGTH = 255;
- public static final int META_MAX_MESSAGE_HEADER_SIZE = 1024;
- public static final int META_MAX_MESSAGE_DATA_SIZE = 1024 * 1024;
- public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT = 20 * 1024
* 1024;
public static final int META_MAX_PARTITION_COUNT = 100;
public static final int META_MAX_BROKER_IP_LENGTH = 32;
public static final int META_MAX_USERNAME_LENGTH = 64;
@@ -67,4 +64,19 @@ public class TBaseConstants {
public static final long CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL = 20000;
+ public static final int META_MB_UNIT_SIZE = (1024 * 1024);
+ public static final int META_MESSAGE_SIZE_ADJUST = (512 * 1024);
+ public static final int META_MAX_MESSAGE_HEADER_SIZE = (10 * 1024);
+
+ public static final int META_MIN_ALLOWED_MESSAGE_SIZE_MB = 1;
+ public static final int META_MAX_ALLOWED_MESSAGE_SIZE_MB = 20;
+ public static final int META_MAX_MESSAGE_DATA_SIZE =
+ META_MIN_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+ public static final int META_MIN_MEM_BUFFER_SIZE =
+ META_MAX_MESSAGE_DATA_SIZE + META_MESSAGE_SIZE_ADJUST;
+ public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
+ META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+
+
+
}
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
index bfbedad..1374b72 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java
@@ -92,4 +92,14 @@ public class MixedUtils {
dataBuffer.flip();
return dataBuffer.array();
}
+
+ // get the middle data between min, max, and data
+ public static int mid(int data, int min, int max) {
+ return Math.max(min, Math.min(max, data));
+ }
+
+ public static long mid(long data, long min, long max) {
+ return Math.max(min, Math.min(max, data));
+ }
+
}
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
index 4a206ef..e748ba4 100644
---
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
+++
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/SettingValidUtils.java
@@ -22,18 +22,17 @@ import org.apache.tubemq.corebase.TBaseConstants;
public class SettingValidUtils {
- // get the middle data between min, max, and data
- public static int mid(int data, int min, int max) {
- return Math.max(min, Math.min(max, data));
- }
- public static long mid(long data, long min, long max) {
- return Math.max(min, Math.min(max, data));
+ public static int validAndXfeMaxMsgSizeFromMBtoB(int inMaxMsgSizeInMB) {
+ return MixedUtils.mid(inMaxMsgSizeInMB,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB)
+ * TBaseConstants.META_MB_UNIT_SIZE;
}
- public static int validAndGetMaxMsgSize(int inMaxMsgSize) {
- return mid(inMaxMsgSize,
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
+ public static int validAndGetMaxMsgSizeInB(int inMaxMsgSizeInB) {
+ return MixedUtils.mid(inMaxMsgSizeInB,
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
}
}
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
index e99ffc3..3a150b3 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcConstants.java
@@ -18,6 +18,9 @@
package org.apache.tubemq.corerpc;
+import org.apache.tubemq.corebase.TBaseConstants;
+
+
public final class RpcConstants {
public static final String RPC_CODEC = "rpc.codec";
@@ -66,7 +69,8 @@ public final class RpcConstants {
public static final int RPC_PROTOCOL_BEGIN_TOKEN = 0xFF7FF4FE;
public static final int RPC_MAX_BUFFER_SIZE = 8192;
public static final int MAX_FRAME_MAX_LIST_SIZE =
- (int) ((1024 * 1024 * 8) / RPC_MAX_BUFFER_SIZE);
+ (int) ((TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT
+ + TBaseConstants.META_MB_UNIT_SIZE * 8) /
RPC_MAX_BUFFER_SIZE);
public static final int RPC_FLAG_MSG_TYPE_REQUEST = 0x0;
public static final int RPC_FLAG_MSG_TYPE_RESPONSE = 0x1;
diff --git a/tubemq-core/src/main/proto/MasterService.proto
b/tubemq-core/src/main/proto/MasterService.proto
index 27e5496..b97f3fc 100644
--- a/tubemq-core/src/main/proto/MasterService.proto
+++ b/tubemq-core/src/main/proto/MasterService.proto
@@ -66,7 +66,7 @@ message ApprovedClientConfig {
optional int32 maxMsgSize = 2;
}
-message ClusterDefConfig {
+message ClusterConfig {
required int64 configId = 1;
optional int32 maxMsgSize = 2;
}
@@ -222,7 +222,7 @@ message RegisterRequestB2M {
optional int32 qryPriorityId = 12;
optional int32 tlsPort = 13;
optional MasterCertificateInfo authInfo = 14;
- optional ClusterDefConfig clsDefConfig = 15;
+ optional ClusterConfig clsConfig = 15;
}
message RegisterResponseM2B {
@@ -245,7 +245,7 @@ message RegisterResponseM2B {
optional int32 qryPriorityId = 15;
optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17;
- optional ClusterDefConfig clsDefConfig = 18;
+ optional ClusterConfig clsConfig = 18;
}
message HeartRequestB2M {
@@ -266,7 +266,7 @@ message HeartRequestB2M {
optional int64 flowCheckId = 13;
optional int32 qryPriorityId = 14;
optional MasterCertificateInfo authInfo = 15;
- optional ClusterDefConfig clsDefConfig = 16;
+ optional ClusterConfig clsConfig = 16;
}
message HeartResponseM2B {
@@ -292,7 +292,7 @@ message HeartResponseM2B {
optional int32 qryPriorityId = 17;
optional MasterAuthorizedInfo authorizedInfo = 18; /* Deprecated */
optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19;
- optional ClusterDefConfig clsDefConfig = 20;
+ optional ClusterConfig clsConfig = 20;
}
message CloseRequestB2M {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index f9b242f..60490c6 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -55,6 +55,7 @@ import org.apache.tubemq.corerpc.RpcConstants;
import org.apache.tubemq.corerpc.service.BrokerReadService;
import org.apache.tubemq.corerpc.service.BrokerWriteService;
import org.apache.tubemq.server.Server;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
@@ -633,10 +634,10 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
builder.setErrMsg("data length is zero!");
return builder.build();
}
- if (dataLength > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024) {
+ if (dataLength > ClusterConfigHolder.getMaxMsgSize()) {
builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
builder.setErrMsg(strBuffer.append("data length over max length,
allowed max length is ")
- .append(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE + 1024)
+ .append(ClusterConfigHolder.getMaxMsgSize())
.append(", data length is
").append(dataLength).toString());
return builder.build();
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
index 556cbba..690f0d7 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
@@ -46,6 +46,7 @@ import org.apache.tubemq.corerpc.service.MasterService;
import org.apache.tubemq.server.Stoppable;
import org.apache.tubemq.server.broker.exception.StartupException;
import org.apache.tubemq.server.broker.metadata.BrokerMetadataManager;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
@@ -223,68 +224,8 @@ public class TubeBroker implements Stoppable {
}
isKeepAlive.set(true);
heartbeatErrors.set(0);
- FlowCtrlRuleHandler flowCtrlRuleHandler =
- metadataManager.getFlowCtrlRuleHandler();
- long flowCheckId =
flowCtrlRuleHandler.getFlowCtrlId();
- int qryPriorityId =
flowCtrlRuleHandler.getQryPriorityId();
- ServiceStatusHolder
-
.setReadWriteServiceStatus(response.getStopRead(),
- response.getStopWrite(), "Master");
- if (response.hasFlowCheckId()) {
- qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() :
qryPriorityId;
- if (response.getFlowCheckId() != flowCheckId) {
- flowCheckId = response.getFlowCheckId();
- try {
- flowCtrlRuleHandler
-
.updateDefFlowCtrlInfo(qryPriorityId,
- flowCheckId,
response.getFlowControlInfo());
- } catch (Exception e1) {
- logger.warn(
- "[HeartBeat response] found parse
flowCtrl rules failure", e1);
- }
- }
- if (qryPriorityId !=
flowCtrlRuleHandler.getQryPriorityId()) {
-
flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
- }
- requireReportConf = response.getNeedReportData();
StringBuilder sBuilder = new StringBuilder(512);
- if (response.getTakeConfInfo()) {
- logger.info(sBuilder
- .append("[HeartBeat response] received
broker metadata info: brokerConfId=")
- .append(response.getCurBrokerConfId())
-
.append(",stopWrite=").append(response.getStopWrite())
-
.append(",stopRead=").append(response.getStopRead())
-
.append(",configCheckSumId=").append(response.getConfCheckSumId())
-
.append(",hasFlowCtrl=").append(response.hasFlowCheckId())
-
.append(",curFlowCtrlId=").append(flowCheckId)
-
.append(",curQryPriorityId=").append(qryPriorityId)
- .append(",brokerDefaultConfInfo=")
-
.append(response.getBrokerDefaultConfInfo())
- .append(",brokerTopicSetConfList=")
-
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
- sBuilder.delete(0, sBuilder.length());
- metadataManager
-
.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
- response.getConfCheckSumId(),
response.getBrokerDefaultConfInfo(),
-
response.getBrokerTopicSetConfInfoList(), false, sBuilder);
- }
- if (response.hasBrokerAuthorizedInfo()) {
-
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
- }
- boolean needProcess =
- metadataManager.updateBrokerRemoveTopicMap(
- response.getTakeRemoveTopicInfo(),
- response.getRemoveTopicConfInfoList(),
sBuilder);
- if (needProcess) {
- new Thread() {
- @Override
- public void run() {
- storeManager.removeTopicStore();
- }
- }.start();
- }
+ procConfigFromHeartBeat(sBuilder, response);
} catch (Throwable t) {
isKeepAlive.set(false);
heartbeatErrors.incrementAndGet();
@@ -355,6 +296,92 @@ public class TubeBroker implements Stoppable {
.append(TubeServerVersion.BROKER_VERSION).toString();
}
+ private void procConfigFromHeartBeat(StringBuilder sBuilder,
+ HeartResponseM2B response) {
+ // process service status
+ ServiceStatusHolder
+ .setReadWriteServiceStatus(response.getStopRead(),
+ response.getStopWrite(), "Master");
+ // process flow controller rules
+ FlowCtrlRuleHandler flowCtrlRuleHandler =
+ metadataManager.getFlowCtrlRuleHandler();
+ long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
+ int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
+ if (response.hasFlowCheckId()) {
+ qryPriorityId = response.hasQryPriorityId()
+ ? response.getQryPriorityId() : qryPriorityId;
+ if (response.getFlowCheckId() != flowCheckId) {
+ flowCheckId = response.getFlowCheckId();
+ try {
+ flowCtrlRuleHandler
+ .updateDefFlowCtrlInfo(qryPriorityId,
+ flowCheckId,
response.getFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn(
+ "[HeartBeat response] found parse flowCtrl rules
failure", e1);
+ }
+ }
+ if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
+ flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ }
+ }
+ // update configure report requirement
+ requireReportConf = response.getNeedReportData();
+ // update cluster setting
+ if (response.hasClsConfig()) {
+ long configId = response.getClsConfig().getConfigId();
+ if (configId != ClusterConfigHolder.getConfigId()) {
+ ClusterConfigHolder.updClusterSetting(response.getClsConfig());
+ logger.info(sBuilder
+ .append("[HeartBeat response] received cluster
configure changed,")
+
.append(",hasClsConfig=").append(response.hasClsConfig())
+
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
+
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
+ .append(",minMemCacheSize=")
+ .append(ClusterConfigHolder.getMinMemCacheSize())
+ .toString());
+ sBuilder.delete(0, sBuilder.length());
+ }
+ }
+ if (response.getTakeConfInfo()) {
+ logger.info(sBuilder
+ .append("[HeartBeat response] received broker metadata
info: brokerConfId=")
+ .append(response.getCurBrokerConfId())
+ .append(",stopWrite=").append(response.getStopWrite())
+ .append(",stopRead=").append(response.getStopRead())
+
.append(",configCheckSumId=").append(response.getConfCheckSumId())
+ .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
+ .append(",curFlowCtrlId=").append(flowCheckId)
+ .append(",curQryPriorityId=").append(qryPriorityId)
+ .append(",brokerDefaultConfInfo=")
+ .append(response.getBrokerDefaultConfInfo())
+ .append(",brokerTopicSetConfList=")
+
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ metadataManager
+ .updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
+ response.getConfCheckSumId(),
response.getBrokerDefaultConfInfo(),
+ response.getBrokerTopicSetConfInfoList(), false,
sBuilder);
+ }
+ // update auth info
+ if (response.hasBrokerAuthorizedInfo()) {
+
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
+ }
+ // process topic deletion
+ boolean needProcess =
+ metadataManager.updateBrokerRemoveTopicMap(
+ response.getTakeRemoveTopicInfo(),
+ response.getRemoveTopicConfInfoList(), sBuilder);
+ if (needProcess) {
+ new Thread() {
+ @Override
+ public void run() {
+ storeManager.removeTopicStore();
+ }
+ }.start();
+ }
+ }
+
/***
* Register to master. Try multi times if failed.
*
@@ -374,53 +401,7 @@ public class TubeBroker implements Stoppable {
.append("Register to master failed! The error
message is ")
.append(response.getErrMsg()).toString());
}
- ServiceStatusHolder
- .setReadWriteServiceStatus(response.getStopRead(),
- response.getStopWrite(), "Master");
- FlowCtrlRuleHandler flowCtrlRuleHandler =
- metadataManager.getFlowCtrlRuleHandler();
- if (response.hasFlowCheckId()) {
- int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() :
flowCtrlRuleHandler.getQryPriorityId();
- if (response.getFlowCheckId() !=
flowCtrlRuleHandler.getFlowCtrlId()) {
- try {
- flowCtrlRuleHandler
-
.updateDefFlowCtrlInfo(response.getQryPriorityId(),
- response.getFlowCheckId(),
response.getFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Register response] found parse
flowCtrl rules failure", e1);
- }
- }
- if (qryPriorityId !=
flowCtrlRuleHandler.getQryPriorityId()) {
- flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
- }
- updateEnableBrokerFunInfo(response);
- logger.info(sBuilder
- .append("[Register response] received broker metadata
info: brokerConfId=")
- .append(response.getCurBrokerConfId())
- .append(",stopWrite=").append(response.getStopWrite())
- .append(",stopRead=").append(response.getStopRead())
-
.append(",configCheckSumId=").append(response.getConfCheckSumId())
- .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
- .append(",enableVisitTokenCheck=")
- .append(serverAuthHandler.isEnableVisitTokenCheck())
- .append(",enableProduceAuthenticate=")
- .append(serverAuthHandler.isEnableProduceAuthenticate())
-
.append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize())
- .append(",enableConsumeAuthenticate=")
- .append(serverAuthHandler.isEnableConsumeAuthenticate())
- .append(",enableConsumeAuthorize=")
- .append(serverAuthHandler.isEnableConsumeAuthorize())
-
.append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-
.append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
-
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
- .append(",brokerTopicSetConfList=")
-
.append(response.getBrokerTopicSetConfInfoList().toString()).toString());
- sBuilder.delete(0, sBuilder.length());
-
metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
- response.getConfCheckSumId(),
response.getBrokerDefaultConfInfo(),
- response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+ procConfigFromRegister(sBuilder, response);
isKeepAlive.set(true);
lastRegTime.set(System.currentTimeMillis());
break;
@@ -435,11 +416,80 @@ public class TubeBroker implements Stoppable {
}
}
- private void updateEnableBrokerFunInfo(final RegisterResponseM2B response)
{
+
+ private void procConfigFromRegister(StringBuilder sBuilder,
+ final RegisterResponseM2B response) {
+ // process service status
+ ServiceStatusHolder
+ .setReadWriteServiceStatus(response.getStopRead(),
+ response.getStopWrite(), "Master");
+ // process flow controller rules
+ FlowCtrlRuleHandler flowCtrlRuleHandler =
+ metadataManager.getFlowCtrlRuleHandler();
+ if (response.hasFlowCheckId()) {
+ int qryPriorityId = response.hasQryPriorityId()
+ ? response.getQryPriorityId() :
flowCtrlRuleHandler.getQryPriorityId();
+ if (response.getFlowCheckId() !=
flowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ flowCtrlRuleHandler
+ .updateDefFlowCtrlInfo(response.getQryPriorityId(),
+ response.getFlowCheckId(),
response.getFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Register response] found parse flowCtrl
rules failure", e1);
+ }
+ }
+ if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
+ flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ }
+ }
+ // update auth info
serverAuthHandler.configure(response.getEnableBrokerInfo());
if (response.hasBrokerAuthorizedInfo()) {
serverAuthHandler.appendVisitToken(response.getBrokerAuthorizedInfo());
}
+ // update cluster setting
+ if (response.hasClsConfig()) {
+ long configId = response.getClsConfig().getConfigId();
+ if (configId != ClusterConfigHolder.getConfigId()) {
+ ClusterConfigHolder.updClusterSetting(response.getClsConfig());
+ }
+ }
+ sBuilder.append("[Register response] received broker metadata info:
brokerConfId=")
+ .append(response.getCurBrokerConfId())
+ .append(",stopWrite=").append(response.getStopWrite())
+ .append(",stopRead=").append(response.getStopRead())
+
.append(",configCheckSumId=").append(response.getConfCheckSumId())
+ .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
+
.append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
+
.append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
+ .append(",hasClsConfig=").append(response.hasClsConfig())
+
.append(",curClusterConfigId=").append(ClusterConfigHolder.getConfigId())
+
.append(",curMaxMsgSize=").append(ClusterConfigHolder.getMaxMsgSize())
+
.append(",minMemCacheSize=").append(ClusterConfigHolder.getMinMemCacheSize())
+ .append(",enableVisitTokenCheck=")
+ .append(serverAuthHandler.isEnableVisitTokenCheck())
+ .append(",enableProduceAuthenticate=")
+ .append(serverAuthHandler.isEnableProduceAuthenticate())
+
.append(",enableProduceAuthorize=").append(serverAuthHandler.isEnableProduceAuthorize())
+ .append(",enableConsumeAuthenticate=")
+ .append(serverAuthHandler.isEnableConsumeAuthenticate())
+ .append(",enableConsumeAuthorize=")
+ .append(serverAuthHandler.isEnableConsumeAuthorize())
+
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
+ .append(",brokerTopicSetConfList=")
+
.append(response.getBrokerTopicSetConfInfoList().toString()).toString();
+ sBuilder.delete(0, sBuilder.length());
+
metadataManager.updateBrokerTopicConfigMap(response.getCurBrokerConfId(),
+ response.getConfCheckSumId(),
response.getBrokerDefaultConfInfo(),
+ response.getBrokerTopicSetConfInfoList(), true, sBuilder);
+ }
+
+ // build cluster configure info
+ private ClientMaster.ClusterConfig.Builder buildClusterConfig() {
+ ClientMaster.ClusterConfig.Builder defSetting =
+ ClientMaster.ClusterConfig.newBuilder();
+ defSetting.setConfigId(ClusterConfigHolder.getConfigId());
+ return defSetting;
}
/***
@@ -474,6 +524,7 @@ public class TubeBroker implements Stoppable {
if (authInfoBuilder != null) {
builder.setAuthInfo(authInfoBuilder.build());
}
+ builder.setClsConfig(buildClusterConfig());
logger.info(new StringBuilder(512)
.append("[Register request] current broker report info:
brokerConfId=")
.append(metadataManager.getBrokerMetadataConfId())
@@ -517,6 +568,7 @@ public class TubeBroker implements Stoppable {
if (authInfoBuilder != null) {
builder.setAuthInfo(authInfoBuilder.build());
}
+ builder.setClsConfig(buildClusterConfig());
if (metadataManager.isBrokerMetadataChanged() || requireReportConf) {
builder.setTakeConfInfo(true);
builder.setBrokerDefaultConfInfo(metadataManager.getBrokerDefMetaConfInfo());
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
new file mode 100644
index 0000000..c72427d
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.broker.metadata;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.tubemq.corebase.utils.MixedUtils;
+import org.apache.tubemq.server.broker.utils.DataStoreUtils;
+
+
+public class ClusterConfigHolder {
+ private static AtomicLong configId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private static AtomicInteger maxMsgSize =
+ new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
+ + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE);
+ private static AtomicInteger minMemCacheSize =
+ new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE);
+ private static AtomicInteger maxMsgStoreLength =
+ new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
+ + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE
+ + DataStoreUtils.STORE_DATA_HEADER_LEN);
+
+ public ClusterConfigHolder() {
+
+ }
+
+ // set master returned configure
+ public static void updClusterSetting(ClientMaster.ClusterConfig
clusterConfig) {
+ if (clusterConfig == null) {
+ return;
+ }
+ if (configId.get() != clusterConfig.getConfigId()) {
+ configId.set(clusterConfig.getConfigId());
+ if (clusterConfig.hasMaxMsgSize()) {
+ int tmpMaxSize = MixedUtils.mid(clusterConfig.getMaxMsgSize(),
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+ TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT)
+ + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE;
+ if (tmpMaxSize != maxMsgSize.get()) {
+ maxMsgSize.set(tmpMaxSize);
+ minMemCacheSize.set(tmpMaxSize +
+ (tmpMaxSize % 4 + 1) *
TBaseConstants.META_MESSAGE_SIZE_ADJUST);
+ maxMsgStoreLength.set(tmpMaxSize +
DataStoreUtils.STORE_DATA_HEADER_LEN);
+ }
+ }
+ }
+ }
+
+ public static long getConfigId() {
+ return configId.get();
+ }
+
+ public static int getMaxMsgSize() {
+ return maxMsgSize.get();
+ }
+
+ public static int getMinMemCacheSize() {
+ return minMemCacheSize.get();
+ }
+
+ public static int getMaxMsgStoreLength() {
+ return maxMsgStoreLength.get();
+ }
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index e610a8b..8f650b4 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -37,6 +37,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStatisInfo;
@@ -129,7 +130,7 @@ public class MessageStore implements Closeable {
this.unflushThreshold.set(topicMetadata.getUnflushThreshold());
this.unflushDataHold.set(topicMetadata.getUnflushDataHold());
this.writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
- this.writeCacheMaxSize = topicMetadata.getMemCacheMsgSize();
+ this.writeCacheMaxSize =
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
@@ -421,7 +422,7 @@ public class MessageStore implements Closeable {
writeCacheMutex.readLock().lock();
try {
writeCacheMaxCnt = topicMetadata.getMemCacheMsgCnt();
- writeCacheMaxSize = topicMetadata.getMemCacheMsgSize();
+ writeCacheMaxSize =
validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
} finally {
writeCacheMutex.readLock().unlock();
@@ -603,6 +604,17 @@ public class MessageStore implements Closeable {
}
}
+ private int validAndGetMemCacheSize(int memCacheSize) {
+ if (memCacheSize <= ClusterConfigHolder.getMinMemCacheSize()) {
+ logger.info(new StringBuilder(512)
+ .append("[Data Store] writeCacheMaxSize changed, from ")
+ .append(memCacheSize).append(" to ")
+
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
+ memCacheSize = ClusterConfigHolder.getMinMemCacheSize();
+ }
+ return memCacheSize;
+ }
+
/***
* Append message and trigger flush operation.
*
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
index 949d149..6fda352 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java
@@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.utils.CheckSum;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -365,7 +366,7 @@ public class FileSegment implements Segment {
itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN +
itemMsglen;
if ((itemMsgToken !=
DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE)
|| (itemMsglen <= 0)
- || (itemMsglen >
DataStoreUtils.MAX_MSG_DATA_STORE_SIZE)
+ || (itemMsglen > ClusterConfigHolder.getMaxMsgSize())
|| (itemNext > totalBytes)) {
next = -1;
break;
@@ -437,7 +438,7 @@ public class FileSegment implements Segment {
if ((itemMsgPartId < 0)
|| (itemMsgOffset < 0)
|| (itemMsglen <= 0)
- || (itemMsglen >
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN)
+ || (itemMsglen >
ClusterConfigHolder.getMaxMsgStoreLength())
|| (itemNext > totalBytes)) {
next = -1;
break;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index 400b666..fe23dd3 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -35,6 +35,7 @@ import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.stats.CountItem;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -273,7 +274,7 @@ public class MsgFileStore implements Closeable {
// skip when mismatch condition
if (curIndexDataOffset < 0
|| curIndexDataSize <= 0
- || curIndexDataSize >
DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
+ || curIndexDataSize >
ClusterConfigHolder.getMaxMsgStoreLength()
|| curIndexDataOffset < curDataMinOffset) {
readedOffset = curIndexOffset +
DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
index 6da3d27..27eef32 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/mem/MsgMemStore.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.server.broker.BrokerConfig;
+import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStore;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.utils.AppendResult;
@@ -224,7 +225,7 @@ public class MsgMemStore implements Closeable {
if ((cDataOffset < 0)
|| (cDataSize <= 0)
|| (cDataOffset >= currDataOffset)
- || (cDataSize > TBaseConstants.META_MAX_MESSAGE_DATA_SIZE
+ 1024)
+ || (cDataSize > ClusterConfigHolder.getMaxMsgSize())
|| (cDataOffset + cDataSize > currDataOffset)) {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
index 05688a7..2ed75c3 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/fielddef/WebFieldDef.java
@@ -88,8 +88,8 @@ public enum WebFieldDef {
ADMINAUTHTOKEN(23, "confModAuthToken", "authToken", WebFieldType.STRING,
"Admin api operation authorization code",
TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH),
- MAXMSGSIZE(24, "maxMsgSize", "maxMsgSize", WebFieldType.INT,
- "Max allowed message size", RegexDef.TMP_NUMBER),
+ MAXMSGSIZE(24, "maxMsgSizeInMB", "maxMsgSizeInMB", WebFieldType.INT,
+ "Max allowed message size, unit MB", RegexDef.TMP_NUMBER),
CREATEDATE(25, "createDate", "cDate", WebFieldType.STRING,
"Record creation date", TBaseConstants.META_MAX_DATEVALUE_LENGTH),
MODIFYDATE(26, "modifyDate", "mDate", WebFieldType.STRING,
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
index aa87940..ed2e122 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/paramcheck/PBParameterUtils.java
@@ -32,6 +32,8 @@ import org.apache.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
+import org.apache.tubemq.server.common.fielddef.WebFieldDef;
+import org.apache.tubemq.server.common.utils.ProcessResult;
import org.apache.tubemq.server.master.MasterConfig;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
@@ -576,4 +578,35 @@ public class PBParameterUtils {
retResult.setCheckData(tmpValue);
return retResult;
}
+
+ /**
+ * Check the string parameter
+ *
+ * @param fieldDef the field to be checked
+ * @param paramValue the field value to be checked
+ * @param strBuffer the string pool construct the result
+ * @param result the checked result
+ * @return result success or failure
+ */
+ public static boolean getStringParameter(WebFieldDef fieldDef,
+ String paramValue,
+ StringBuilder strBuffer,
+ ProcessResult result) {
+ if (TStringUtils.isBlank(paramValue)) {
+ result.setFailResult(strBuffer.append("Request miss necessary ")
+ .append(fieldDef.name).append(" data!").toString());
+ strBuffer.delete(0, strBuffer.length());
+ return result.success;
+ }
+ String tmpValue = paramValue.trim();
+ if (tmpValue.length() > fieldDef.valMaxLen) {
+ result.setFailResult(strBuffer.append(fieldDef.name)
+ .append("'s length over max value, allowed max length is ")
+ .append(fieldDef.valMaxLen).toString());
+ strBuffer.delete(0, strBuffer.length());
+ return result.success;
+ }
+ result.setSuccResult(tmpValue);
+ return result.success;
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 385fe22..d4b1e20 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -297,37 +297,11 @@ public class WebParameterUtils {
WebFieldDef fieldDef,
boolean required,
ProcessResult result) {
- if (!getStringParamValue(req, fieldDef,
- required, null, result)) {
- return result.success;
- }
- Set<Integer> tgtValueSet = new HashSet<Integer>();
- if (fieldDef.isCompFieldType()) {
- Set<String> valItemSet = (Set<String>) result.retData1;
- if (valItemSet.isEmpty()) {
- result.setSuccResult(tgtValueSet);
- return result.success;
- }
- for (String itemVal : valItemSet) {
- if (!checkIntValueNorms(fieldDef,
- itemVal, false, -1, result)) {
- return result.success;
- }
- tgtValueSet.add((Integer) result.retData1);
- }
- } else {
- String paramValue = (String) result.retData1;
- if (paramValue == null) {
- result.setSuccResult(tgtValueSet);
- return result.success;
- }
- if (!checkIntValueNorms(fieldDef,
- paramValue, false, -1, result)) {
- tgtValueSet.add((Integer) result.retData1);
- }
- }
- result.setSuccResult(tgtValueSet);
- return result.success;
+ return getIntParamValue(req, fieldDef, required,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ false, TBaseConstants.META_VALUE_UNDEFINED,
+ result);
}
/**
@@ -342,11 +316,48 @@ public class WebParameterUtils {
* @return process result
*/
public static boolean getIntParamValue(HttpServletRequest req,
- WebFieldDef fieldDef,
- boolean required,
- int defValue,
- int minValue,
- ProcessResult result) {
+ WebFieldDef fieldDef,
+ boolean required,
+ int defValue,
+ int minValue,
+ ProcessResult result) {
+ return getIntParamValue(req, fieldDef, required, true, defValue,
+ true, minValue, false, TBaseConstants.META_VALUE_UNDEFINED,
result);
+ }
+
+ /**
+ * Parse the parameter value from an object value to a integer value
+ *
+ * @param req Http Servlet Request
+ * @param fieldDef the parameter field definition
+ * @param required a boolean value represent whether the parameter is
must required
+ * @param defValue a default value returned if the field not exist
+ * @param minValue min value required
+ * @param minValue max value allowed
+ * @param result process result of parameter value
+ * @return process result
+ */
+ public static boolean getIntParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ int defValue,
+ int minValue,
+ int maxValue,
+ ProcessResult result) {
+ return getIntParamValue(req, fieldDef, required, true, defValue,
+ true, minValue, true, maxValue, result);
+ }
+
+ private static boolean getIntParamValue(HttpServletRequest req,
+ WebFieldDef fieldDef,
+ boolean required,
+ boolean hasDefVal,
+ int defValue,
+ boolean hasMinVal,
+ int minValue,
+ boolean hasMaxVal,
+ int maxValue,
+ ProcessResult result) {
if (!getStringParamValue(req, fieldDef, required, null, result)) {
return result.success;
}
@@ -354,13 +365,15 @@ public class WebParameterUtils {
Set<Integer> tgtValueSet = new HashSet<Integer>();
Set<String> valItemSet = (Set<String>) result.retData1;
if (valItemSet.isEmpty()) {
- tgtValueSet.add(defValue);
+ if (hasDefVal) {
+ tgtValueSet.add(defValue);
+ }
result.setSuccResult(tgtValueSet);
return result.success;
}
for (String itemVal : valItemSet) {
- if (!checkIntValueNorms(fieldDef,
- itemVal, true, minValue, result)) {
+ if (!checkIntValueNorms(fieldDef, itemVal,
+ hasMinVal, minValue, hasMaxVal, maxValue, result)) {
return result.success;
}
tgtValueSet.add((Integer) result.retData1);
@@ -369,11 +382,13 @@ public class WebParameterUtils {
} else {
String paramValue = (String) result.retData1;
if (paramValue == null) {
- result.setSuccResult(defValue);
+ if (hasDefVal) {
+ result.setSuccResult(defValue);
+ }
return result.success;
}
- checkIntValueNorms(fieldDef,
- paramValue, true, minValue, result);
+ checkIntValueNorms(fieldDef, paramValue,
+ hasMinVal, minValue, hasMinVal, maxValue, result);
}
return result.success;
}
@@ -691,6 +706,8 @@ public class WebParameterUtils {
* @param paramValue the parameter value
* @param hasMinVal whether there is a minimum
* param minValue the parameter min value
+ * @param hasMaxVal whether there is a maximum
+ * param maxValue the parameter max value
* @param result process result
* @return check result for string value of parameter
*/
@@ -698,6 +715,8 @@ public class WebParameterUtils {
String paramValue,
boolean hasMinVal,
int minValue,
+ boolean hasMaxVal,
+ int maxValue,
ProcessResult result) {
try {
int paramIntVal = Integer.parseInt(paramValue);
@@ -707,6 +726,12 @@ public class WebParameterUtils {
.append(" value must >=
").append(minValue).toString());
return false;
}
+ if (hasMaxVal && paramIntVal > maxValue) {
+ result.setFailResult(new StringBuilder(512)
+ .append("Parameter ").append(fieldDef.name)
+ .append(" value must <=
").append(maxValue).toString());
+ return false;
+ }
result.setSuccResult(paramIntVal);
} catch (Throwable e) {
result.setFailResult(new StringBuilder(512)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 4299bc7..e7327cb 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -48,6 +48,7 @@ import org.apache.tubemq.corebase.cluster.ProducerInfo;
import org.apache.tubemq.corebase.cluster.SubscribeInfo;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.config.TLSConfig;
+import org.apache.tubemq.corebase.protobuf.generated.ClientMaster;
import
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestB2M;
import
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestC2M;
import
org.apache.tubemq.corebase.protobuf.generated.ClientMaster.CloseRequestP2M;
@@ -101,6 +102,7 @@ import
org.apache.tubemq.server.master.balance.DefaultLoadBalancer;
import org.apache.tubemq.server.master.balance.LoadBalancer;
import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
@@ -348,6 +350,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.setBrokerCheckSum(this.defaultBrokerConfManager.getBrokerInfoCheckSum());
builder.addAllBrokerInfos(this.defaultBrokerConfManager.getBrokersMap(overtls).values());
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken,
false).build());
+ ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
+ buildApprovedClientConfig(request.getAppdConfig());
+ if (clientConfigBuilder != null) {
+ builder.setAppdConfig(clientConfigBuilder);
+ }
logger.info(strBuffer.append("[Producer Register] ").append(producerId)
.append(", isOverTLS=").append(overtls)
.append(", clientJDKVer=").append(clientJdkVer).toString());
@@ -436,6 +443,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
if (defaultBrokerConfManager.getBrokerInfoCheckSum() !=
inBrokerCheckSum) {
builder.addAllBrokerInfos(defaultBrokerConfManager.getBrokersMap(overtls).values());
}
+ ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder =
+ buildApprovedClientConfig(request.getAppdConfig());
+ if (clientConfigBuilder != null) {
+ builder.setAppdConfig(clientConfigBuilder);
+ }
if (logger.isDebugEnabled()) {
logger.debug(strBuffer.append("[Push Producer's available topic
count:]")
.append(producerId).append(TokenConstants.LOG_SEG_SEP)
@@ -1071,6 +1083,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
+ ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
+ buildClusterConfig(request.getClsConfig());
+ if (clusterConfigBuilder != null) {
+ builder.setClsConfig(clusterConfigBuilder);
+ }
if (request.hasFlowCheckId()) {
BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
defaultBrokerConfManager.getBdbDefFlowCtrl();
@@ -1259,6 +1276,11 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
}
brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(),
builder);
+ ClientMaster.ClusterConfig.Builder clusterConfigBuilder =
+ buildClusterConfig(request.getClsConfig());
+ if (clusterConfigBuilder != null) {
+ builder.setClsConfig(clusterConfigBuilder);
+ }
builder.setTakeRemoveTopicInfo(true);
builder.addAllRemoveTopicConfInfo(defaultBrokerConfManager
.getBrokerRemovedTopicStrConfigInfo(bdbBrokerConfEntity));
@@ -2295,6 +2317,56 @@ public class TMaster extends HasThread implements
MasterService, Stoppable {
}
/**
+ * build approved client configure
+ *
+ * @param inClientConfig client reported Configure info
+ * @return ApprovedClientConfig
+ */
+ private ClientMaster.ApprovedClientConfig.Builder
buildApprovedClientConfig(
+ ClientMaster.ApprovedClientConfig inClientConfig) {
+ ClientMaster.ApprovedClientConfig.Builder outClientConfig = null;
+ if (inClientConfig != null) {
+ outClientConfig = ClientMaster.ApprovedClientConfig.newBuilder();
+ BdbClusterSettingEntity settingEntity =
+ this.defaultBrokerConfManager.getBdbClusterSetting();
+ if (settingEntity == null) {
+
outClientConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
+ } else {
+ outClientConfig.setConfigId(settingEntity.getConfigId());
+ if (settingEntity.getConfigId() !=
inClientConfig.getConfigId()) {
+
outClientConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
+ }
+ }
+ }
+ return outClientConfig;
+ }
+
+
+ /**
+ * build cluster configure info
+ *
+ * @param inClusterConfig broker reported Configure info
+ * @return ClusterConfig
+ */
+ private ClientMaster.ClusterConfig.Builder buildClusterConfig(
+ ClientMaster.ClusterConfig inClusterConfig) {
+ ClientMaster.ClusterConfig.Builder outClsConfig = null;
+ if (inClusterConfig != null) {
+ outClsConfig = ClientMaster.ClusterConfig.newBuilder();
+ BdbClusterSettingEntity settingEntity =
+ this.defaultBrokerConfManager.getBdbClusterSetting();
+ if (settingEntity == null) {
+ outClsConfig.setConfigId(TBaseConstants.META_VALUE_UNDEFINED);
+ } else {
+ outClsConfig.setConfigId(settingEntity.getConfigId());
+ if (settingEntity.getConfigId() !=
inClusterConfig.getConfigId()) {
+
outClsConfig.setMaxMsgSize(settingEntity.getMaxMsgSizeInB());
+ }
+ }
+ }
+ return outClsConfig;
+ }
+ /**
* Start balance chore
*
* @param master
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
index 588fd87..7b9b570 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -37,6 +37,7 @@ public class BdbClusterSettingEntity implements Serializable {
@PrimaryKey
private String recordKey = "";
+ private long configId = TBaseConstants.META_VALUE_UNDEFINED;
//broker tcp port
private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
//broker tls port
@@ -48,20 +49,22 @@ public class BdbClusterSettingEntity implements
Serializable {
//partition num
private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
//flush disk threshold
- private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED;
+ private int unflushThreshold = TBaseConstants.META_VALUE_UNDEFINED;
//flush disk interval
- private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED;
- //flush memory cache threshold
- private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED;
- //flush memory cache interval
- private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED;
+ private int unflushInterval = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush disk data count
+ private int unflushDataHold = TBaseConstants.META_VALUE_UNDEFINED;
//flush memory cache count
- private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED;
+ private int memCacheMsgCntInK = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush memory cache interval
+ private int memCacheFlushIntvl = TBaseConstants.META_VALUE_UNDEFINED;
+ //flush memory cache size
+ private int memCacheMsgSizeInMB = TBaseConstants.META_VALUE_UNDEFINED;
private boolean acceptPublish = true; //enable publish
private boolean acceptSubscribe = true; //enable subscribe
- private String deleteWhen = ""; //delete policy execute time
+ private String deletePolicy = ""; //delete policy execute time
private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
- private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED;
+ private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
private String attributes = ""; //extra attribute
private String modifyUser; //modify user
private Date modifyDate; //modify date
@@ -70,30 +73,34 @@ public class BdbClusterSettingEntity implements
Serializable {
}
//Constructor
- public BdbClusterSettingEntity(String recordKey, int brokerPort, int
brokerTLSPort,
- int brokerWebPort, int numTopicStores, int
numPartitions,
- int unflushDskThreshold, int
unflushDksInterval,
- int unflushMemThreshold, int
unflushMemInterval,
- int unflushMemCnt, boolean acceptPublish,
- boolean acceptSubscribe, String deleteWhen,
- int qryPriorityId, int maxMsgSize, String
attributes,
+ public BdbClusterSettingEntity(String recordKey, long configId, int
brokerPort,
+ int brokerTLSPort, int brokerWebPort,
+ int numTopicStores, int numPartitions,
+ int unflushThreshold, int unflushInterval,
+ int unflushDataHold, int memCacheMsgCntInK,
+ int memCacheFlushIntvl, int
memCacheMsgSizeInMB,
+ boolean acceptPublish, boolean
acceptSubscribe,
+ String deletePolicy, int qryPriorityId,
+ int maxMsgSizeInB, String attributes,
String modifyUser, Date modifyDate) {
this.recordKey = recordKey;
+ this.configId = configId;
this.brokerPort = brokerPort;
this.brokerTLSPort = brokerTLSPort;
this.brokerWebPort = brokerWebPort;
this.numTopicStores = numTopicStores;
this.numPartitions = numPartitions;
- this.unflushDskThreshold = unflushDskThreshold;
- this.unflushDksInterval = unflushDksInterval;
- this.unflushMemThreshold = unflushMemThreshold;
- this.unflushMemInterval = unflushMemInterval;
- this.unflushMemCnt = unflushMemCnt;
+ this.unflushThreshold = unflushThreshold;
+ this.unflushInterval = unflushInterval;
+ this.unflushDataHold = unflushDataHold;
+ this.memCacheMsgCntInK = memCacheMsgCntInK;
+ this.memCacheFlushIntvl = memCacheFlushIntvl;
+ this.memCacheMsgSizeInMB = memCacheMsgSizeInMB;
this.acceptPublish = acceptPublish;
this.acceptSubscribe = acceptSubscribe;
- this.deleteWhen = deleteWhen;
+ this.deletePolicy = deletePolicy;
this.qryPriorityId = qryPriorityId;
- this.maxMsgSize = maxMsgSize;
+ this.maxMsgSizeInB = maxMsgSizeInB;
this.attributes = attributes;
this.modifyUser = modifyUser;
this.modifyDate = modifyDate;
@@ -107,6 +114,10 @@ public class BdbClusterSettingEntity implements
Serializable {
return recordKey;
}
+ public long getConfigId() {
+ return configId;
+ }
+
public int getBrokerPort() {
return brokerPort;
}
@@ -147,44 +158,52 @@ public class BdbClusterSettingEntity implements
Serializable {
this.numPartitions = numPartitions;
}
- public int getUnflushDskThreshold() {
- return unflushDskThreshold;
+ public int getUnflushThreshold() {
+ return unflushThreshold;
+ }
+
+ public void setUnflushThreshold(int unflushThreshold) {
+ this.unflushThreshold = unflushThreshold;
+ }
+
+ public int getUnflushInterval() {
+ return unflushInterval;
}
- public void setUnflushDskThreshold(int unflushDskThreshold) {
- this.unflushDskThreshold = unflushDskThreshold;
+ public void setUnflushInterval(int unflushInterval) {
+ this.unflushInterval = unflushInterval;
}
- public int getUnflushDksInterval() {
- return unflushDksInterval;
+ public int getUnflushDataHold() {
+ return unflushDataHold;
}
- public void setUnflushDksInterval(int unflushDksInterval) {
- this.unflushDksInterval = unflushDksInterval;
+ public void setUnflushDataHold(int unflushDataHold) {
+ this.unflushDataHold = unflushDataHold;
}
- public int getUnflushMemThreshold() {
- return unflushMemThreshold;
+ public int getMemCacheMsgCntInK() {
+ return memCacheMsgCntInK;
}
- public void setUnflushMemThreshold(int unflushMemThreshold) {
- this.unflushMemThreshold = unflushMemThreshold;
+ public void setMemCacheMsgCntInK(int memCacheMsgCntInK) {
+ this.memCacheMsgCntInK = memCacheMsgCntInK;
}
- public int getUnflushMemInterval() {
- return unflushMemInterval;
+ public int getMemCacheFlushIntvl() {
+ return memCacheFlushIntvl;
}
- public void setUnflushMemInterval(int unflushMemInterval) {
- this.unflushMemInterval = unflushMemInterval;
+ public void setMemCacheFlushIntvl(int memCacheFlushIntvl) {
+ this.memCacheFlushIntvl = memCacheFlushIntvl;
}
- public int getUnflushMemCnt() {
- return unflushMemCnt;
+ public int getMemCacheMsgSizeInMB() {
+ return memCacheMsgSizeInMB;
}
- public void setUnflushMemCnt(int unflushMemCnt) {
- this.unflushMemCnt = unflushMemCnt;
+ public void setMemCacheMsgSizeInMB(int memCacheMsgSizeInMB) {
+ this.memCacheMsgSizeInMB = memCacheMsgSizeInMB;
}
public boolean isAcceptPublish() {
@@ -203,12 +222,12 @@ public class BdbClusterSettingEntity implements
Serializable {
this.acceptSubscribe = acceptSubscribe;
}
- public String getDeleteWhen() {
- return deleteWhen;
+ public String getDeletePolicy() {
+ return deletePolicy;
}
- public void setDeleteWhen(String deleteWhen) {
- this.deleteWhen = deleteWhen;
+ public void setDeletePolicy(String deletePolicy) {
+ this.deletePolicy = deletePolicy;
}
public int getQryPriorityId() {
@@ -219,12 +238,12 @@ public class BdbClusterSettingEntity implements
Serializable {
this.qryPriorityId = qryPriorityId;
}
- public int getMaxMsgSize() {
- return maxMsgSize;
+ public int getMaxMsgSizeInB() {
+ return maxMsgSizeInB;
}
- public void setMaxMsgSize(int maxMsgSize) {
- this.maxMsgSize = maxMsgSize;
+ public void setMaxMsgSizeInB(int maxMsgSizeInB) {
+ this.maxMsgSizeInB = maxMsgSizeInB;
}
public String getAttributes() {
@@ -236,6 +255,7 @@ public class BdbClusterSettingEntity implements
Serializable {
}
public void setModifyInfo(String modifyUser, Date modifyDate) {
+ this.configId = System.currentTimeMillis();
this.modifyUser = modifyUser;
this.modifyDate = modifyDate;
}
@@ -255,23 +275,30 @@ public class BdbClusterSettingEntity implements
Serializable {
* @return
*/
public StringBuilder toJsonString(final StringBuilder sBuilder) {
- return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
+ sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",")
.append("\"recordKey\":\"").append(recordKey).append("\"")
+ .append(",\"configId\":").append(configId)
.append(",\"brokerPort\":").append(brokerPort)
.append(",\"brokerTLSPort\":").append(brokerTLSPort)
.append(",\"brokerWebPort\":").append(brokerWebPort)
.append(",\"numTopicStores\":").append(numTopicStores)
.append(",\"numPartitions\":").append(numPartitions)
-
.append(",\"unflushDskThreshold\":").append(unflushDskThreshold)
- .append(",\"unflushDksInterval\":").append(unflushDksInterval)
-
.append(",\"unflushMemThreshold\":").append(unflushMemThreshold)
- .append(",\"unflushMemInterval\":").append(unflushMemInterval)
- .append(",\"unflushMemCnt\":").append(unflushMemCnt)
+ .append(",\"unflushThreshold\":").append(unflushThreshold)
+ .append(",\"unflushInterval\":").append(unflushInterval)
+ .append(",\"unflushDataHold\":").append(unflushDataHold)
+ .append(",\"memCacheMsgCntInK\":").append(memCacheMsgCntInK)
+ .append(",\"memCacheFlushIntvl\":").append(memCacheFlushIntvl)
+
.append(",\"memCacheMsgSizeInMB\":").append(memCacheMsgSizeInMB)
.append(",\"acceptPublish\":").append(acceptPublish)
.append(",\"acceptSubscribe\":").append(acceptSubscribe)
- .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"")
- .append(",\"maxMsgSize\":").append(maxMsgSize)
- .append(",\"qryPriorityId\":").append(qryPriorityId)
+
.append(",\"deletePolicy\":\"").append(deletePolicy).append("\"")
+ .append(",\"maxMsgSizeInMB\":");
+ if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) {
+ sBuilder.append(maxMsgSizeInB);
+ } else {
+ sBuilder.append(maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE);
+ }
+ return sBuilder.append(",\"qryPriorityId\":").append(qryPriorityId)
.append(",\"attributes\":\"").append(attributes).append("\"")
.append(",\"modifyUser\":\"").append(modifyUser).append("\"")
.append(",\"modifyDate\":\"")
@@ -281,23 +308,30 @@ public class BdbClusterSettingEntity implements
Serializable {
@Override
public String toString() {
- return new ToStringBuilder(this)
+ ToStringBuilder sBuilder = new ToStringBuilder(this)
.append("recordKey", recordKey)
+ .append("configId", configId)
.append("brokerPort", brokerPort)
.append("brokerTLSPort", brokerTLSPort)
.append("brokerWebPort", brokerWebPort)
.append("numTopicStores", numTopicStores)
.append("numPartitions", numPartitions)
- .append("unflushDskThreshold", unflushDskThreshold)
- .append("unflushDksInterval", unflushDksInterval)
- .append("unflushMemThreshold", unflushMemThreshold)
- .append("unflushMemInterval", unflushMemInterval)
- .append("unflushMemCnt", unflushMemCnt)
+ .append("unflushThreshold", unflushThreshold)
+ .append("unflushInterval", unflushInterval)
+ .append("unflushDataHold", unflushDataHold)
+ .append("memCacheMsgCntInK", memCacheMsgCntInK)
+ .append("memCacheFlushIntvl", memCacheFlushIntvl)
+ .append("memCacheMsgSizeInMB", memCacheMsgSizeInMB)
.append("acceptPublish", acceptPublish)
.append("acceptSubscribe", acceptSubscribe)
- .append("deleteWhen", deleteWhen)
- .append("maxMsgSize", maxMsgSize)
- .append("qryPriorityId", qryPriorityId)
+ .append("deletePolicy", deletePolicy);
+ if (maxMsgSizeInB == TBaseConstants.META_VALUE_UNDEFINED) {
+ sBuilder.append("maxMsgSizeInMB", maxMsgSizeInB);
+ } else {
+ sBuilder.append("maxMsgSizeInMB",
+ maxMsgSizeInB / TBaseConstants.META_MB_UNIT_SIZE);
+ }
+ return sBuilder.append("qryPriorityId", qryPriorityId)
.append("attributes", attributes)
.append("modifyUser", modifyUser)
.append("modifyDate", modifyDate)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index 9afe1aa..f2fece8 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -169,13 +169,14 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
if (!WebParameterUtils.getIntParamValue(req,
WebFieldDef.MAXMSGSIZE, false,
TBaseConstants.META_VALUE_UNDEFINED,
- TBaseConstants.META_MAX_MESSAGE_DATA_SIZE,
+ TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
+ TBaseConstants.META_MAX_ALLOWED_MESSAGE_SIZE_MB,
result)) {
WebParameterUtils.buildFailResult(sBuilder, result.errInfo);
return sBuilder;
}
- int maxMsgSize = (int) result.retData1;
- if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) {
+ int maxMsgSizeInMB = (int) result.retData1;
+ if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
dataChanged = true;
}
// check and get modify date
@@ -196,9 +197,9 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
defClusterSetting = new BdbClusterSettingEntity();
}
defClusterSetting.setModifyInfo(modifyUser, modifyDate);
- if (maxMsgSize != TBaseConstants.META_VALUE_UNDEFINED) {
- defClusterSetting.setMaxMsgSize(
- SettingValidUtils.validAndGetMaxMsgSize(maxMsgSize));
+ if (maxMsgSizeInMB != TBaseConstants.META_VALUE_UNDEFINED) {
+ defClusterSetting.setMaxMsgSizeInB(
+
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB));
}
try {
brokerConfManager.confSetBdbClusterDefSetting(defClusterSetting);