This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit f5ae304281370ac17968fc4fe6e20d0c20f3b47d Author: gosonzhang <[email protected]> AuthorDate: Fri Jan 8 18:41:23 2021 +0800 [TUBEMQ-499] Add configure store --- .../apache/tubemq/corebase/utils/MixedUtils.java | 9 + tubemq-core/src/main/proto/MasterService.proto | 18 ++ .../tubemq/server/common/TServerConstants.java | 2 + .../server/master/bdbstore/BdbStoreService.java | 7 + .../master/bdbstore/DefaultBdbStoreService.java | 95 +++++++ .../bdbentitys/BdbClusterSettingEntity.java | 309 +++++++++++++++++++++ .../nodemanage/nodebroker/BrokerConfManager.java | 95 +++++++ 7 files changed, 535 insertions(+) diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java index bfbedad..bcd0738 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/MixedUtils.java @@ -92,4 +92,13 @@ public class MixedUtils { dataBuffer.flip(); return dataBuffer.array(); } + + // get the middle data between min, max, and data + public static int mid(int data, int min, int max) { + return Math.max(min, Math.min(max, data)); + } + + public static long mid(long data, long min, long max) { + return Math.max(min, Math.min(max, data)); + } } diff --git a/tubemq-core/src/main/proto/MasterService.proto b/tubemq-core/src/main/proto/MasterService.proto index cefd2d2..27e5496 100644 --- a/tubemq-core/src/main/proto/MasterService.proto +++ b/tubemq-core/src/main/proto/MasterService.proto @@ -61,6 +61,16 @@ message MasterBrokerAuthorizedInfo { optional string authAuthorizedToken = 2; } +message ApprovedClientConfig { + required int64 configId = 1; + optional int32 maxMsgSize = 2; +} + +message ClusterDefConfig { + required int64 configId = 1; + optional int32 maxMsgSize = 2; +} + message RegisterRequestP2M { required string clientId = 1; repeated string topicList = 2; @@ -68,6 +78,7 @@ message RegisterRequestP2M { required string hostName = 4; optional MasterCertificateInfo authInfo = 5; optional string jdkVersion = 6; + optional ApprovedClientConfig appdConfig = 7; } message RegisterResponseM2P { @@ -77,6 +88,7 @@ message RegisterResponseM2P { required int64 brokerCheckSum = 4; repeated string brokerInfos = 5; optional MasterAuthorizedInfo authorizedInfo = 6; + optional ApprovedClientConfig appdConfig = 7; } message HeartRequestP2M { @@ -85,6 +97,7 @@ message HeartRequestP2M { required string hostName = 3; repeated string topicList = 4; optional MasterCertificateInfo authInfo = 5; + optional ApprovedClientConfig appdConfig = 6; } message HeartResponseM2P { @@ -97,6 +110,7 @@ message HeartResponseM2P { repeated string brokerInfos = 6; optional bool requireAuth = 7; optional MasterAuthorizedInfo authorizedInfo = 8; + optional ApprovedClientConfig appdConfig = 9; } message CloseRequestP2M{ @@ -208,6 +222,7 @@ message RegisterRequestB2M { optional int32 qryPriorityId = 12; optional int32 tlsPort = 13; optional MasterCertificateInfo authInfo = 14; + optional ClusterDefConfig clsDefConfig = 15; } message RegisterResponseM2B { @@ -230,6 +245,7 @@ message RegisterResponseM2B { optional int32 qryPriorityId = 15; optional MasterAuthorizedInfo authorizedInfo = 16; /* Deprecated */ optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 17; + optional ClusterDefConfig clsDefConfig = 18; } message HeartRequestB2M { @@ -250,6 +266,7 @@ message HeartRequestB2M { optional int64 flowCheckId = 13; optional int32 qryPriorityId = 14; optional MasterCertificateInfo authInfo = 15; + optional ClusterDefConfig clsDefConfig = 16; } message HeartResponseM2B { @@ -275,6 +292,7 @@ message HeartResponseM2B { optional int32 qryPriorityId = 17; optional MasterAuthorizedInfo authorizedInfo = 18; /* Deprecated */ optional MasterBrokerAuthorizedInfo brokerAuthorizedInfo = 19; + optional ClusterDefConfig clsDefConfig = 20; } message CloseRequestB2M { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java index 12af51a..5793364 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java @@ -25,6 +25,8 @@ public final class TServerConstants { public static final String TOKEN_JOB_TOPICS = "topics"; public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager"; public static final String TOKEN_DEFAULT_FLOW_CONTROL = "default_master_ctrl"; + public static final String TOKEN_DEFAULT_CLUSTER_SETTING = "default_cluster_config"; + public static final String TOKEN_BLANK_FILTER_CONDITION = ",,"; public static final int CFG_MODAUTHTOKEN_MAX_LENGTH = 128; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java index 7b264c7..4cb2185 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/BdbStoreService.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity; @@ -103,4 +104,10 @@ public interface BdbStoreService { boolean isNew); ConcurrentHashMap<String, BdbConsumeGroupSettingEntity> getConsumeGroupSettingMap(); + + boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew); + + boolean delBdbClusterConfEntity(); + + ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap(); } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java index b201cde..1b8a1b1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java @@ -55,11 +55,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tubemq.corebase.TokenConstants; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.server.Server; +import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig; import org.apache.tubemq.server.master.MasterConfig; import org.apache.tubemq.server.master.TMaster; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity; @@ -72,6 +74,7 @@ import org.apache.tubemq.server.master.web.model.ClusterNodeVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Bdb store service * like a local database manager, according to database table name, store instance, primary key, memory cache @@ -80,6 +83,7 @@ import org.slf4j.LoggerFactory; public class DefaultBdbStoreService implements BdbStoreService, Server { private static final Logger logger = LoggerFactory.getLogger(DefaultBdbStoreService.class); + private static final String BDB_CLUSTER_SETTING_STORE_NAME = "bdbClusterSetting"; private static final String BDB_TOPIC_CONFIG_STORE_NAME = "bdbTopicConfig"; private static final String BDB_BROKER_CONFIG_STORE_NAME = "bdbBrokerConfig"; private static final String BDB_CONSUMER_GROUP_STORE_NAME = "bdbConsumerGroup"; @@ -152,6 +156,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { private PrimaryIndex<String/* recordKey */, BdbConsumeGroupSettingEntity> consumeGroupSettingIndex; private ConcurrentHashMap<String/* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap = new ConcurrentHashMap<>(); + // cluster default setting store + private EntityStore clusterDefSettingStore; + private PrimaryIndex<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingIndex; + private ConcurrentHashMap<String/* recordKey */, BdbClusterSettingEntity> clusterDefSettingMap = + new ConcurrentHashMap<>(); // service status private AtomicBoolean isStarted = new AtomicBoolean(false); // master role flag @@ -386,6 +395,14 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { logger.error("[BDB Error] Close groupFlowCtrlStore error ", e); } } + if (clusterDefSettingStore != null) { + try { + clusterDefSettingStore.close(); + clusterDefSettingStore = null; + } catch (Throwable e) { + logger.error("[BDB Error] Close clusterDefSettingStore error ", e); + } + } /* evn close */ if (repEnv != null) { try { @@ -769,6 +786,39 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { return true; } + /** + * Put cluster default setting bdb entity + * + * @param clusterConfEntity + * @param isNew + * @return + */ + @Override + public boolean putBdbClusterConfEntity(BdbClusterSettingEntity clusterConfEntity, boolean isNew) { + BdbClusterSettingEntity result = null; + try { + result = clusterDefSettingIndex.put(clusterConfEntity); + } catch (Throwable e) { + logger.error("[BDB Error] Put ClusterConfEntity Error ", e); + return false; + } + if (isNew) { + return result == null; + } + return result != null; + } + + @Override + public boolean delBdbClusterConfEntity() { + try { + clusterDefSettingIndex.delete(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING); + } catch (Throwable e) { + logger.error("[BDB Error] delBdbClusterConfEntity Error ", e); + return false; + } + return true; + } + @Override public ConcurrentHashMap<String, ConcurrentHashMap<String, BdbConsumerGroupEntity>> getConsumerGroupNameAccControlMap() { @@ -797,6 +847,11 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { return this.consumeGroupSettingMap; } + @Override + public ConcurrentHashMap<String, BdbClusterSettingEntity> getClusterDefSettingMap() { + return this.clusterDefSettingMap; + } + /** * Get master group status * @@ -977,6 +1032,10 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { new EntityStore(repEnv, BDB_CONSUME_GROUP_SETTING_STORE_NAME, storeConfig); consumeGroupSettingIndex = consumeGroupSettingStore.getPrimaryIndex(String.class, BdbConsumeGroupSettingEntity.class); + clusterDefSettingStore = + new EntityStore(repEnv, BDB_CLUSTER_SETTING_STORE_NAME, storeConfig); + clusterDefSettingIndex = + clusterDefSettingStore.getPrimaryIndex(String.class, BdbClusterSettingEntity.class); } /** @@ -1394,6 +1453,41 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { logger.info("loadConsumeGroupSettingUnits successfully..."); } + + private void loadClusterDefSettingUnits() throws Exception { + long count = 0L; + EntityCursor<BdbClusterSettingEntity> cursor = null; + logger.info("loadClusterDefSettingUnits start..."); + try { + cursor = clusterDefSettingIndex.entities(); + clusterDefSettingMap.clear(); + StringBuilder sBuilder = logger.isDebugEnabled() ? new StringBuilder(512) : null; + logger.debug("[loadClusterDefSettingUnits] Load consumer group begin:"); + for (BdbClusterSettingEntity bdbEntity : cursor) { + if (bdbEntity == null) { + logger.warn("[BDB Error] Found Null data while loading from clusterDefSettingIndex!"); + continue; + } + clusterDefSettingMap.put(bdbEntity.getRecordKey(), bdbEntity); + count++; + if (logger.isDebugEnabled()) { + logger.debug(bdbEntity.toJsonString(sBuilder).toString()); + sBuilder.delete(0, sBuilder.length()); + } + } + logger.debug("[loadClusterDefSettingUnits] Load consumer group finished!"); + logger.info("[loadClusterDefSettingUnits] total load records are {}", count); + } catch (Exception e) { + logger.error("[loadClusterDefSettingUnits error] ", e); + throw e; + } finally { + if (cursor != null) { + cursor.close(); + } + } + logger.info("loadClusterDefSettingUnits successfully..."); + } + public class Listener implements StateChangeListener { @Override public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException { @@ -1424,6 +1518,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { if (!isMaster) { try { clearCachedRunData(); + loadClusterDefSettingUnits(); loadBrokerConfUnits(); loadTopicConfUnits(); loadGroupFlowCtrlUnits(); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java new file mode 100644 index 0000000..ca6e1b4 --- /dev/null +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.server.master.bdbstore.bdbentitys; + +import com.sleepycat.persist.model.Entity; +import com.sleepycat.persist.model.PrimaryKey; +import java.io.Serializable; +import java.util.Date; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.tubemq.corebase.TBaseConstants; +import org.apache.tubemq.server.common.utils.WebParameterUtils; + + +/* + * store the cluster default setting + * + */ +@Entity +public class BdbClusterSettingEntity implements Serializable { + + private static final long serialVersionUID = -3259439355290322115L; + + @PrimaryKey + private String recordKey = ""; + //broker tcp port + private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED; + //broker tls port + private int brokerTLSPort = TBaseConstants.META_VALUE_UNDEFINED; + //broker web port + private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED; + //store num + private int numTopicStores = TBaseConstants.META_VALUE_UNDEFINED; + //partition num + private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED; + //flush disk threshold + private int unflushDskThreshold = TBaseConstants.META_VALUE_UNDEFINED; + //flush disk interval + private int unflushDksInterval = TBaseConstants.META_VALUE_UNDEFINED; + //flush memory cache threshold + private int unflushMemThreshold = TBaseConstants.META_VALUE_UNDEFINED; + //flush memory cache interval + private int unflushMemInterval = TBaseConstants.META_VALUE_UNDEFINED; + //flush memory cache count + private int unflushMemCnt = TBaseConstants.META_VALUE_UNDEFINED; + private boolean acceptPublish = true; //enable publish + private boolean acceptSubscribe = true; //enable subscribe + private String deleteWhen = ""; //delete policy execute time + private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED; + private int maxMsgSize = TBaseConstants.META_VALUE_UNDEFINED; + private String attributes; //extra attribute + private String modifyUser; //modify user + private Date modifyDate; //modify date + + public BdbClusterSettingEntity() { + } + + //Constructor + public BdbClusterSettingEntity(String recordKey, int brokerPort, int brokerTLSPort, + int brokerWebPort, int numTopicStores, int numPartitions, + int unflushDskThreshold, int unflushDksInterval, + int unflushMemThreshold, int unflushMemInterval, + int unflushMemCnt, boolean acceptPublish, + boolean acceptSubscribe, String deleteWhen, + int qryPriorityId, int maxMsgSize, String attributes, + String modifyUser, Date modifyDate) { + this.recordKey = recordKey; + this.brokerPort = brokerPort; + this.brokerTLSPort = brokerTLSPort; + this.brokerWebPort = brokerWebPort; + this.numTopicStores = numTopicStores; + this.numPartitions = numPartitions; + this.unflushDskThreshold = unflushDskThreshold; + this.unflushDksInterval = unflushDksInterval; + this.unflushMemThreshold = unflushMemThreshold; + this.unflushMemInterval = unflushMemInterval; + this.unflushMemCnt = unflushMemCnt; + this.acceptPublish = acceptPublish; + this.acceptSubscribe = acceptSubscribe; + this.deleteWhen = deleteWhen; + this.qryPriorityId = qryPriorityId; + this.maxMsgSize = maxMsgSize; + this.attributes = attributes; + this.modifyUser = modifyUser; + this.modifyDate = modifyDate; + } + + public void setRecordKey(String recordKey) { + this.recordKey = recordKey; + } + + public String getRecordKey() { + return recordKey; + } + + public int getBrokerPort() { + return brokerPort; + } + + public void setBrokerPort(int brokerPort) { + this.brokerPort = brokerPort; + } + + public int getBrokerTLSPort() { + return brokerTLSPort; + } + + public void setBrokerTLSPort(int brokerTLSPort) { + this.brokerTLSPort = brokerTLSPort; + } + + public int getBrokerWebPort() { + return brokerWebPort; + } + + public void setBrokerWebPort(int brokerWebPort) { + this.brokerWebPort = brokerWebPort; + } + + public int getNumTopicStores() { + return numTopicStores; + } + + public void setNumTopicStores(int numTopicStores) { + this.numTopicStores = numTopicStores; + } + + public int getNumPartitions() { + return numPartitions; + } + + public void setNumPartitions(int numPartitions) { + this.numPartitions = numPartitions; + } + + public int getUnflushDskThreshold() { + return unflushDskThreshold; + } + + public void setUnflushDskThreshold(int unflushDskThreshold) { + this.unflushDskThreshold = unflushDskThreshold; + } + + public int getUnflushDksInterval() { + return unflushDksInterval; + } + + public void setUnflushDksInterval(int unflushDksInterval) { + this.unflushDksInterval = unflushDksInterval; + } + + public int getUnflushMemThreshold() { + return unflushMemThreshold; + } + + public void setUnflushMemThreshold(int unflushMemThreshold) { + this.unflushMemThreshold = unflushMemThreshold; + } + + public int getUnflushMemInterval() { + return unflushMemInterval; + } + + public void setUnflushMemInterval(int unflushMemInterval) { + this.unflushMemInterval = unflushMemInterval; + } + + public int getUnflushMemCnt() { + return unflushMemCnt; + } + + public void setUnflushMemCnt(int unflushMemCnt) { + this.unflushMemCnt = unflushMemCnt; + } + + public boolean isAcceptPublish() { + return acceptPublish; + } + + public void setAcceptPublish(boolean acceptPublish) { + this.acceptPublish = acceptPublish; + } + + public boolean isAcceptSubscribe() { + return acceptSubscribe; + } + + public void setAcceptSubscribe(boolean acceptSubscribe) { + this.acceptSubscribe = acceptSubscribe; + } + + public String getDeleteWhen() { + return deleteWhen; + } + + public void setDeleteWhen(String deleteWhen) { + this.deleteWhen = deleteWhen; + } + + public int getQryPriorityId() { + return qryPriorityId; + } + + public void setQryPriorityId(int qryPriorityId) { + this.qryPriorityId = qryPriorityId; + } + + public int getMaxMsgSize() { + return maxMsgSize; + } + + public void setMaxMsgSize(int maxMsgSize) { + this.maxMsgSize = maxMsgSize; + } + + public String getAttributes() { + return attributes; + } + + public void setAttributes(String attributes) { + this.attributes = attributes; + } + + public String getModifyUser() { + return modifyUser; + } + + public void setModifyUser(String modifyUser) { + this.modifyUser = modifyUser; + } + + public Date getModifyDate() { + return modifyDate; + } + + public void setModifyDate(Date modifyDate) { + this.modifyDate = modifyDate; + } + + /** + * Serialize field to json format + * + * @param sBuilder + * @return + */ + public StringBuilder toJsonString(final StringBuilder sBuilder) { + return sBuilder.append("{\"type\":\"BdbClusterSettingEntity\",") + .append("\"recordKey\":\"").append(recordKey).append("\"") + .append(",\"brokerPort\":").append(brokerPort) + .append(",\"brokerTLSPort\":").append(brokerTLSPort) + .append(",\"brokerWebPort\":").append(brokerWebPort) + .append(",\"numTopicStores\":").append(numTopicStores) + .append(",\"numPartitions\":").append(numPartitions) + .append(",\"unflushDskThreshold\":").append(unflushDskThreshold) + .append(",\"unflushDksInterval\":").append(unflushDksInterval) + .append(",\"unflushMemThreshold\":").append(unflushMemThreshold) + .append(",\"unflushMemInterval\":").append(unflushMemInterval) + .append(",\"unflushMemCnt\":").append(unflushMemCnt) + .append(",\"acceptPublish\":").append(acceptPublish) + .append(",\"acceptSubscribe\":").append(acceptSubscribe) + .append(",\"deleteWhen\":\"").append(deleteWhen).append("\"") + .append(",\"maxMsgSize\":").append(maxMsgSize) + .append(",\"qryPriorityId\":").append(qryPriorityId) + .append(",\"attributes\":\"").append(attributes).append("\"") + .append(",\"modifyUser\":\"").append(modifyUser).append("\"") + .append(",\"modifyDate\":\"") + .append(WebParameterUtils.date2yyyyMMddHHmmss(modifyDate)) + .append("\"}"); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("recordKey", recordKey) + .append("brokerPort", brokerPort) + .append("brokerTLSPort", brokerTLSPort) + .append("brokerWebPort", brokerWebPort) + .append("numTopicStores", numTopicStores) + .append("numPartitions", numPartitions) + .append("unflushDskThreshold", unflushDskThreshold) + .append("unflushDksInterval", unflushDksInterval) + .append("unflushMemThreshold", unflushMemThreshold) + .append("unflushMemInterval", unflushMemInterval) + .append("unflushMemCnt", unflushMemCnt) + .append("acceptPublish", acceptPublish) + .append("acceptSubscribe", acceptSubscribe) + .append("deleteWhen", deleteWhen) + .append("maxMsgSize", maxMsgSize) + .append("qryPriorityId", qryPriorityId) + .append("attributes", attributes) + .append("modifyUser", modifyUser) + .append("modifyDate", modifyDate) + .toString(); + } +} diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java index 698f0d3..86a6bc4 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java @@ -46,6 +46,7 @@ import org.apache.tubemq.server.master.bdbstore.DefaultBdbStoreService; import org.apache.tubemq.server.master.bdbstore.MasterGroupStatus; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity; +import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumeGroupSettingEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntity; import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity; @@ -90,6 +91,7 @@ public class BrokerConfManager implements Server { ConcurrentHashMap<String /* consumerGroup */, BdbGroupFilterCondEntity>> groupFilterCondTopicMap; private ConcurrentHashMap<String /* groupName */, BdbGroupFlowCtrlEntity> consumeGroupFlowCtrlMap; private ConcurrentHashMap<String /* consumeGroup */, BdbConsumeGroupSettingEntity> consumeGroupSettingMap; + private ConcurrentHashMap<String /* recordKey */, BdbClusterSettingEntity> clusterSettingMap; private AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis()); private long lastBrokerUpdatedTime = System.currentTimeMillis(); private long serviceStartTime = System.currentTimeMillis(); @@ -98,6 +100,8 @@ public class BrokerConfManager implements Server { public BrokerConfManager(DefaultBdbStoreService mBdbStoreManagerService) { this.mBdbStoreManagerService = mBdbStoreManagerService; this.replicationConfig = mBdbStoreManagerService.getReplicationConfig(); + this.clusterSettingMap = + this.mBdbStoreManagerService.getClusterDefSettingMap(); this.brokerConfStoreMap = this.mBdbStoreManagerService.getBrokerConfigMap(); for (BdbBrokerConfEntity entity : this.brokerConfStoreMap.values()) { updateBrokerMaps(entity); @@ -2013,6 +2017,97 @@ public class BrokerConfManager implements Server { return true; } + // ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Add cluster default setting + * + * @param bdbEntity the cluster default setting entity will be add + * @return true if success otherwise false + * @throws Exception + */ + public boolean confAddBdbClusterDefSetting(BdbClusterSettingEntity bdbEntity) + throws Exception { + validMasterStatus(); + BdbClusterSettingEntity curEntity = + clusterSettingMap.get(bdbEntity.getRecordKey()); + if (curEntity != null) { + throw new Exception(new StringBuilder(512) + .append("Duplicate add ClusterSetting info, exist record is: ") + .append(curEntity).toString()); + } + boolean putResult = + mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, true); + if (putResult) { + clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity); + logger.info(new StringBuilder(512) + .append("[ClusterSetting Success] ") + .append(bdbEntity).toString()); + return true; + } + return false; + } + + /** + * update cluster default setting + * + * @param bdbEntity the cluster setting entity will be set + * @return true if success otherwise false + * @throws Exception + */ + public boolean confUpdBdbClusterSetting(BdbClusterSettingEntity bdbEntity) + throws Exception { + validMasterStatus(); + StringBuilder strBuffer = new StringBuilder(512); + BdbClusterSettingEntity curDefSettingEntity = + clusterSettingMap.get(bdbEntity.getRecordKey()); + if (curDefSettingEntity == null) { + throw new Exception(strBuffer + .append("Update ClusterSetting failure, not exist record for record: ") + .append(bdbEntity.getRecordKey()).toString()); + } + boolean putResult = + mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity, false); + if (putResult) { + clusterSettingMap.put(bdbEntity.getRecordKey(), bdbEntity); + strBuffer.append("[confUpdBdbClusterSetting Success] record from : "); + strBuffer = curDefSettingEntity.toJsonString(strBuffer); + strBuffer.append(" to : "); + strBuffer = bdbEntity.toJsonString(strBuffer); + logger.info(strBuffer.toString()); + return true; + } + return false; + } + + /** + * Delete cluster default setting + * + * @param strBuffer the error info string buffer + * @return true if success + * @throws Exception + */ + public boolean confDeleteBdbClusterSetting(final StringBuilder strBuffer) throws Exception { + validMasterStatus(); + BdbClusterSettingEntity curEntity = + this.clusterSettingMap.remove(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING); + if (curEntity != null) { + mBdbStoreManagerService.delBdbClusterConfEntity(); + strBuffer.append( + "[confDeleteBdbClusterSetting Success], deleted cluster setting record :"); + logger.info(curEntity.toJsonString(strBuffer).toString()); + strBuffer.delete(0, strBuffer.length()); + } else { + logger.info("[confDeleteBdbClusterSetting Success], not found record"); + } + return true; + } + + public BdbClusterSettingEntity getBdbClusterSetting() { + return this.clusterSettingMap.get(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING); + } + + private void validMasterStatus() throws Exception { if (!isSelfMaster()) { throw new StandbyException("Please send your request to the master Node.");
