This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new f24ea23 [INLONG-626] Fix broker and topic confiugre implement bugs
(#479)
f24ea23 is described below
commit f24ea23b23ddb028e8b4dafe7fc550c7229b3cba
Author: gosonzhang <[email protected]>
AuthorDate: Tue Jun 8 14:03:10 2021 +0800
[INLONG-626] Fix broker and topic confiugre implement bugs (#479)
* [INLONG-626] Fix broker and topic confiugre implement bugs
* [INLONG-626] Fix broker and topic confiugre implement bugs
* [INLONG-626] Fix broker and topic confiugre implement bugs
* [INLONG-626] Fix broker and topic confiugre implement bugs
* [INLONG-626] Fix broker and topic confiugre implement bugs
Co-authored-by: gosonzhang <[email protected]>
---
.../apache/tubemq/corebase/cluster/TopicInfo.java | 16 +++
.../server/broker/msgstore/MessageStore.java | 8 +-
.../server/common/heartbeat/HeartbeatManager.java | 2 +-
.../tubemq/server/common/statusdef/StepStatus.java | 30 ++++--
.../tubemq/server/common/utils/SerialIdUtils.java | 39 +++++++
.../server/common/utils/WebParameterUtils.java | 2 +-
.../server/master/metamanage/MetaDataManager.java | 35 ++++---
.../metastore/dao/entity/BaseEntity.java | 11 +-
.../metastore/dao/entity/BrokerConfEntity.java | 5 +-
.../metastore/dao/entity/ClusterSettingEntity.java | 5 +-
.../metastore/dao/entity/TopicDeployEntity.java | 5 +-
.../metastore/dao/entity/TopicPropGroup.java | 12 ---
.../nodemanage/nodebroker/BrokerAbnHolder.java | 6 +-
.../nodemanage/nodebroker/BrokerPSInfoHolder.java | 45 +++-----
.../nodemanage/nodebroker/BrokerRunManager.java | 6 +-
.../nodemanage/nodebroker/BrokerRunStatusInfo.java | 53 +++++-----
.../nodemanage/nodebroker/BrokerSyncData.java | 37 +++----
.../nodemanage/nodebroker/BrokerTopicInfoView.java | 113 ++++++++++++++++-----
.../nodemanage/nodebroker/DefBrokerRunManager.java | 30 +++---
.../master/web/handler/WebTopicDeployHandler.java | 13 +++
.../server/common/WebParameterUtilsTest.java | 31 +++++-
21 files changed, 320 insertions(+), 184 deletions(-)
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
index 2e4e075..43400b7 100644
---
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
+++
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.corebase.cluster;
import java.io.Serializable;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.Tuple2;
public class TopicInfo implements Serializable {
@@ -74,6 +75,21 @@ public class TopicInfo implements Serializable {
this.acceptSubscribe = acceptSubscribe;
}
+ // return result <isChanged, isScaleOut>
+ public Tuple2<Boolean, Boolean> updAndJudgeTopicInfo(TopicInfo
newTopicInfo) {
+ boolean isChanged = false;
+ if (this.acceptPublish != newTopicInfo.acceptPublish) {
+ isChanged = true;
+ this.acceptPublish = newTopicInfo.acceptPublish;
+ }
+ if (this.acceptSubscribe != newTopicInfo.acceptSubscribe) {
+ isChanged = true;
+ this.acceptSubscribe = newTopicInfo.acceptSubscribe;
+ }
+ return new Tuple2<>(isChanged, (this.partitionNum !=
newTopicInfo.partitionNum
+ || this.topicStoreNum != newTopicInfo.topicStoreNum));
+ }
+
public int getTopicStoreNum() {
return topicStoreNum;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a025f7c..8a88e6f 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -38,7 +38,6 @@ import
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStatisInfo;
@@ -603,12 +602,7 @@ public class MessageStore implements Closeable {
private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
int memCacheSize = topicMetadata.getMemCacheMsgSize();
- if (memCacheSize <= topicMetadata.getMinMemCacheSize()) {
- logger.info(new StringBuilder(512)
- .append("[Data Store] ").append(getTopic())
- .append(" writeCacheMaxSize changed, from ")
- .append(memCacheSize).append(" to ")
-
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
+ if (memCacheSize < topicMetadata.getMinMemCacheSize()) {
memCacheSize = topicMetadata.getMinMemCacheSize();
}
return memCacheSize;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index dc83412..93f4c7b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -263,7 +263,7 @@ public class HeartbeatManager {
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
- if (createId.equals(timeoutInfo.getSecondKey())) {
+ if (!createId.equals(timeoutInfo.getSecondKey())) {
result.setFailResult(TErrCodeConstants.HB_NO_NODE,
sBuffer.append("Invalid node block id:").append(nodeId)
.append(", you have to append node
first!").toString());
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
index cfa4038..5903fc6 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
@@ -20,23 +20,27 @@ package org.apache.tubemq.server.common.statusdef;
public enum StepStatus {
- STEP_STATUS_UNDEFINED(-2, "idle", 0),
- STEP_STATUS_LOAD_DATA(1, "load_data", 0),
- STEP_STATUS_WAIT_ONLINE(2, "wait_online", 0),
- STEP_STATUS_WAIT_SYNC(3, "wait_sync", 0),
- STEP_STATUS_WAIT_SUBSCRIBE(4, "wait_sub", 60000),
- STEP_STATUS_WAIT_PUBLISH(5, "wait_pub", 30000);
+ STEP_STATUS_UNDEFINED(-2, "idle", 0, 0),
+ STEP_STATUS_LOAD_DATA(1, "load_data", 0, 0),
+ STEP_STATUS_WAIT_ONLINE(2, "wait_online", 0, 0),
+ STEP_STATUS_WAIT_SYNC(3, "wait_sync", 0, 0),
+ STEP_STATUS_WAIT_SUBSCRIBE(4, "wait_sub", 55000, 40000),
+ STEP_STATUS_WAIT_PUBLISH(5, "wait_pub", 25000, 10000);
private int code;
private String description;
- private long delayDurInMs;
+ private long normalDelayDurIdnMs;
+ private long shortDelayDurIdnMs;
- StepStatus(int code, String description, long delayDurInMs) {
+
+ StepStatus(int code, String description,
+ long normalDelayDurIdnMs, long shortDelayDurIdnMs) {
this.code = code;
this.description = description;
- this.delayDurInMs = delayDurInMs;
+ this.normalDelayDurIdnMs = normalDelayDurIdnMs;
+ this.shortDelayDurIdnMs = shortDelayDurIdnMs;
}
public int getCode() {
@@ -47,8 +51,12 @@ public enum StepStatus {
return description;
}
- public long getDelayDurInMs() {
- return delayDurInMs;
+ public long getNormalDelayDurInMs() {
+ return normalDelayDurIdnMs;
+ }
+
+ public long getShortDelayDurIdnMs() {
+ return shortDelayDurIdnMs;
}
public static StepStatus valueOf(int code) {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
new file mode 100644
index 0000000..7f771f3
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+
+public class SerialIdUtils {
+
+ public static void updTimeStampSerialIdValue(final AtomicLong serialId) {
+ long curSerialId = serialId.get();
+ long newSerialId = System.currentTimeMillis();
+ do {
+ if (newSerialId > curSerialId) {
+ if (serialId.compareAndSet(curSerialId, newSerialId)) {
+ break;
+ }
+ }
+ curSerialId = serialId.get();
+ newSerialId = curSerialId + 10;
+ } while (true);
+ }
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 52b7dbc..81d7a2a 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -388,7 +388,7 @@ public class WebParameterUtils {
}
newConf.setUnflushInterval((int) result.retData1);
// get unflushDataHold parameter value
- if (!WebParameterUtils.getIntParamValue(paramCntr,
WebFieldDef.UNFLUSHINTERVAL, false,
+ if (!WebParameterUtils.getIntParamValue(paramCntr,
WebFieldDef.UNFLUSHDATAHOLD, false,
(defVal == null ? TBaseConstants.META_VALUE_UNDEFINED :
defVal.getUnflushDataHold()),
TServerConstants.TOPIC_DSK_UNFLUSHDATAHOLD_MIN, sBuffer,
result)) {
return result.isSuccess();
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index e6ee213..8fc6683 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -839,7 +839,7 @@ public class MetaDataManager implements Server {
if (topicEntity != null
&& topicEntity.getTopicStatus() ==
TopicStatus.STATUS_TOPIC_SOFT_REMOVE) {
confDelTopicConfInfo(topicEntity.getModifyUser(),
- topicEntity.getRecordKey(), strBuffer, result);
+ topicEntity.getRecordKey(), topicEntity.getBrokerId(),
strBuffer, result);
}
}
result.setSuccResult(null);
@@ -874,7 +874,8 @@ public class MetaDataManager implements Server {
if (topicEntity == null) {
continue;
}
- confDelTopicConfInfo(operator, topicEntity.getRecordKey(),
strBuffer, result);
+ confDelTopicConfInfo(operator, topicEntity.getRecordKey(),
+ topicEntity.getBrokerId(), strBuffer, result);
}
result.setSuccResult(null);
return result.isSuccess();
@@ -949,7 +950,6 @@ public class MetaDataManager implements Server {
}
TopicDeployEntity deployConf =
new TopicDeployEntity(opEntity, brokerId, topicName);
- deployConf.setTopicProps(brokerConf.getTopicProps());
deployConf.updModifyInfo(opEntity.getDataVerId(),
TBaseConstants.META_VALUE_UNDEFINED,
brokerConf.getBrokerPort(),
brokerConf.getBrokerIp(), deployStatus, newProps);
@@ -982,7 +982,9 @@ public class MetaDataManager implements Server {
metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
if (isAddOp) {
if (curEntity == null) {
- metaStoreService.addTopicConf(deployEntity, sBuffer, result);
+ if (metaStoreService.addTopicConf(deployEntity, sBuffer,
result)) {
+ triggerBrokerConfDataSync(deployEntity.getBrokerId(),
sBuffer, result);
+ }
} else {
if (curEntity.isValidTopicStatus()) {
result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -1071,7 +1073,9 @@ public class MetaDataManager implements Server {
.append(curEntity.getTopicName()).toString());
sBuffer.delete(0, sBuffer.length());
} else {
- metaStoreService.updTopicConf(newEntity, sBuffer, result);
+ if (metaStoreService.updTopicConf(newEntity, sBuffer, result))
{
+ triggerBrokerConfDataSync(deployEntity.getBrokerId(),
sBuffer, result);
+ }
}
return new TopicProcessResult(deployEntity.getBrokerId(),
deployEntity.getTopicName(), result);
@@ -1152,7 +1156,9 @@ public class MetaDataManager implements Server {
if (newEntity.updModifyInfo(opEntity.getDataVerId(),
curEntity.getTopicId(), brokerConf.getBrokerPort(),
brokerConf.getBrokerIp(), topicStatus, null)) {
- metaStoreService.updTopicConf(newEntity, sBuffer, result);
+ if (metaStoreService.updTopicConf(newEntity, sBuffer, result)) {
+ triggerBrokerConfDataSync(newEntity.getBrokerId(), sBuffer,
result);
+ }
} else {
result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
sBuffer.append("Data not changed for brokerId=")
@@ -1196,15 +1202,15 @@ public class MetaDataManager implements Server {
return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet,
brokerIdSet);
}
-
-
-
private boolean confDelTopicConfInfo(String operator,
String recordKey,
- StringBuilder strBuffer,
+ int brokerId,
+ StringBuilder sBuffer,
ProcessResult result) {
- return metaStoreService.delTopicConf(operator,
- recordKey, strBuffer, result);
+ if (metaStoreService.delTopicConf(operator, recordKey, sBuffer,
result)) {
+ return triggerBrokerConfDataSync(brokerId, sBuffer, result);
+ }
+ return false;
}
public Map<String, String> getBrokerTopicStrConfigInfo(
@@ -1227,8 +1233,7 @@ public class MetaDataManager implements Server {
return topicConfStrMap;
}
TopicPropGroup defTopicProps = brokerEntity.getTopicProps();
- ClusterSettingEntity clusterDefConf =
- metaStoreService.getClusterConfig();
+ ClusterSettingEntity clusterDefConf = getClusterDefSetting(false);
int defMsgSizeInB = clusterDefConf.getMaxMsgSizeInB();
for (TopicDeployEntity topicEntity : topicEntityMap.values()) {
/*
@@ -1464,7 +1469,7 @@ public class MetaDataManager implements Server {
public boolean addIfAbsentTopicCtrlConf(String topicName, String operator,
StringBuilder sBuffer,
ProcessResult result) {
int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
- ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+ ClusterSettingEntity defSetting = getClusterDefSetting(false);
if (defSetting != null) {
maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 2601083..42592b1 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
@@ -176,15 +177,7 @@ public class BaseEntity implements Serializable, Cloneable
{
}
protected void updSerialId() {
- long curSerialId;
- long newSerialId;
- do {
- curSerialId = this.serialId.get();
- newSerialId = System.currentTimeMillis();
- if (newSerialId == curSerialId) {
- newSerialId++;
- }
- } while (!this.serialId.compareAndSet(curSerialId, newSerialId));
+ SerialIdUtils.updTimeStampSerialIdValue(this.serialId);
}
public String getModifyUser() {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 7f49e68..90fb38d 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -285,8 +285,9 @@ public class BrokerConfEntity extends BaseEntity implements
Cloneable {
// check and set topicProps info
if (topicProps != null
&& !topicProps.isDataEquals(this.topicProps)) {
- changed = true;
- this.topicProps = topicProps;
+ if (this.topicProps.updModifyInfo(topicProps)) {
+ changed = true;
+ }
}
if (changed) {
updSerialId();
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 39a889d..f09f419 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -194,8 +194,9 @@ public class ClusterSettingEntity extends BaseEntity
implements Cloneable {
// check and set clsDefTopicProps info
if (defTopicProps != null
&& !defTopicProps.isDataEquals(clsDefTopicProps)) {
- changed = true;
- clsDefTopicProps = defTopicProps;
+ if (clsDefTopicProps.updModifyInfo(defTopicProps)) {
+ changed = true;
+ }
}
if (changed) {
updSerialId();
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index c110c0a..3096c1b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -233,8 +233,9 @@ public class TopicDeployEntity extends BaseEntity
implements Cloneable {
// check and set topicProps info
if (topicProps != null
&& !topicProps.isDataEquals(this.topicProps)) {
- changed = true;
- this.topicProps = topicProps;
+ if (this.topicProps.updModifyInfo(topicProps)) {
+ changed = true;
+ }
}
if (changed) {
updSerialId();
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
index 180ac8f..77042d6 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
@@ -294,18 +294,6 @@ public class TopicPropGroup implements Serializable,
Cloneable {
}
/**
- * check TopicPropGroup's partition or storeblock changed
- * @return if changed
- */
- public boolean isPartOrStoreChanged(TopicPropGroup other) {
- if (this.numPartitions != other.numPartitions
- || this.numTopicStores != other.numTopicStores) {
- return true;
- }
- return false;
- }
-
- /**
* check if subclass fields is equals
*
* @param other check object
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 51f886d..e1e7590 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -46,7 +46,8 @@ public class BrokerAbnHolder {
new ConcurrentHashMap<>();
private final int maxAutoForbiddenCnt;
private final MetaDataManager metaDataManager;
- private AtomicInteger brokerForbiddenCount = new AtomicInteger(0);
+ private final AtomicInteger brokerForbiddenCount =
+ new AtomicInteger(0);
public BrokerAbnHolder(final int maxAutoForbiddenCnt,
@@ -154,8 +155,7 @@ public class BrokerAbnHolder {
/**
* Remove broker info and decrease total broker count and forbidden broker
count
*
- * @param brokerId
- * @return the deleted broker info
+ * @param brokerId the deleted broker id
*/
public void removeBroker(Integer brokerId) {
brokerAbnormalMap.remove(brokerId);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index dcdbdb7..a225430 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -29,12 +29,14 @@ import
org.apache.tubemq.server.common.statusdef.ManageStatus;
public class BrokerPSInfoHolder {
// broker manage status
- private ConcurrentHashSet<Integer/* brokerId */> enablePubBrokerIdSet =
new ConcurrentHashSet<>();
- private ConcurrentHashSet<Integer/* brokerId */> enableSubBrokerIdSet =
new ConcurrentHashSet<>();
+ private final ConcurrentHashSet<Integer/* brokerId */>
enablePubBrokerIdSet =
+ new ConcurrentHashSet<>();
+ private final ConcurrentHashSet<Integer/* brokerId */>
enableSubBrokerIdSet =
+ new ConcurrentHashSet<>();
// broker subscribe topic view info
- private BrokerTopicInfoView subTopicInfoView = new BrokerTopicInfoView();
+ private final BrokerTopicInfoView subTopicInfoView = new
BrokerTopicInfoView();
// broker publish topic view info
- private BrokerTopicInfoView pubTopicInfoView = new BrokerTopicInfoView();
+ private final BrokerTopicInfoView pubTopicInfoView = new
BrokerTopicInfoView();
public BrokerPSInfoHolder() {
@@ -45,17 +47,14 @@ public class BrokerPSInfoHolder {
* remove broker all configure info
*
* @param brokerId broker id index
- * @param isTimeout if broker is timeout
*/
- public void rmvBrokerAllPushedInfo(int brokerId, boolean isTimeout) {
+ public void rmvBrokerAllPushedInfo(int brokerId) {
// remove broker status Info
enablePubBrokerIdSet.remove(brokerId);
enableSubBrokerIdSet.remove(brokerId);
- if (!isTimeout) {
- // remove broker topic info
- subTopicInfoView.rmvBrokerTopicInfo(brokerId);
- pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
- }
+ // remove broker topic info
+ subTopicInfoView.rmvBrokerTopicInfo(brokerId);
+ pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
}
/**
@@ -66,12 +65,12 @@ public class BrokerPSInfoHolder {
*/
public void updBrokerMangeStatus(int brokerId, ManageStatus mngStatus) {
Tuple2<Boolean, Boolean> pubSubStatus = mngStatus.getPubSubStatus();
- if (pubSubStatus.getF0() == Boolean.TRUE) {
+ if (pubSubStatus.getF0()) {
enablePubBrokerIdSet.add(brokerId);
} else {
enablePubBrokerIdSet.remove(brokerId);
}
- if (pubSubStatus.getF1() == Boolean.TRUE) {
+ if (pubSubStatus.getF1()) {
enableSubBrokerIdSet.add(brokerId);
} else {
enableSubBrokerIdSet.remove(brokerId);
@@ -90,13 +89,15 @@ public class BrokerPSInfoHolder {
* @param topicInfoMap broker's topic configure info,
* if topicInfoMap is null, reserve current configure;
* if topicInfoMap is empty, clear current configure.
+ * @return if fast sync data
*/
- public void updBrokerSubTopicConfInfo(int brokerId,
+ public boolean updBrokerSubTopicConfInfo(int brokerId,
Map<String, TopicInfo> topicInfoMap)
{
if (topicInfoMap == null) {
- return;
+ return true;
}
subTopicInfoView.updBrokerTopicConfInfo(brokerId, topicInfoMap);
+ return pubTopicInfoView.fastUpdBrokerTopicConfInfo(brokerId,
topicInfoMap);
}
/**
@@ -116,20 +117,6 @@ public class BrokerPSInfoHolder {
}
/**
- * update broker manage status and topicInfo configures
- *
- * @param brokerId broker id index
- * @param mngStatus broker's manage status
- * @param topicInfoMap broker's topic configure info
- */
- public void updateBrokerPushedInfo(int brokerId, ManageStatus mngStatus,
- Map<String, TopicInfo> topicInfoMap) {
- updBrokerMangeStatus(brokerId, mngStatus);
- updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
- updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
- }
-
- /**
* Get the maximum number of broker distributions of topic
*
* @param topicSet need query topic set
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index 973435b..af24f78 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -77,9 +77,9 @@ public interface BrokerRunManager {
Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds);
- void updBrokerCsmConfInfo(int brokerId,
- ManageStatus mngStatus,
- Map<String, TopicInfo> topicInfoMap);
+ boolean updBrokerCsmConfInfo(int brokerId,
+ ManageStatus mngStatus,
+ Map<String, TopicInfo> topicInfoMap);
void updBrokerPrdConfInfo(int brokerId,
ManageStatus mngStatus,
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
index 80ffd80..e928819 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
@@ -50,14 +50,15 @@ public class BrokerRunStatusInfo {
private BrokerInfo brokerInfo;
private String createId;
// config change flag
- private AtomicBoolean isConfChanged = new AtomicBoolean(false);
- private AtomicLong confChangeNo =
+ private final AtomicBoolean isConfChanged =
+ new AtomicBoolean(false);
+ private final AtomicLong confChangeNo =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
// data sync status
private StepStatus curStepStatus;
private volatile long nextStepOpTimeInMills = 0;
// current loaded change No.
- private AtomicLong confLoadedNo =
+ private final AtomicLong confLoadedNo =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
// broker sync data info
BrokerSyncData brokerSyncData = new BrokerSyncData();
@@ -94,7 +95,7 @@ public class BrokerRunStatusInfo {
this.brokerSyncData.updBrokerSyncData(true,
confChangeNo.get(), mngStatus, brokerConfInfo,
topicConfInfoMap);
this.curStepStatus = StepStatus.STEP_STATUS_WAIT_ONLINE;
- this.nextStepOpTimeInMills = curTime + curStepStatus.getDelayDurInMs();
+ this.nextStepOpTimeInMills = curTime +
curStepStatus.getNormalDelayDurInMs();
}
public void notifyDataChanged() {
@@ -164,7 +165,7 @@ public class BrokerRunStatusInfo {
List<String> repTopicConfs,
StringBuilder sBuffer) {
boolean isSynchronized =
- brokerSyncData.bookBrokerReportInfo(repConfigId,
+ brokerSyncData.bookBrokerReportInfo(brokerInfo, repConfigId,
repCheckSumId, isTackData, repBrokerConfInfo,
repTopicConfs);
this.isOnline = isOnline;
goNextStatus(isRegister, isSynchronized, sBuffer);
@@ -186,7 +187,6 @@ public class BrokerRunStatusInfo {
.append(",\"isDoneDataLoad\":").append(isDoneDataLoad)
.append(",\"isDoneDataSub\":").append(isDoneDataSub)
.append(",\"isDoneDataPub\":").append(isDoneDataPub)
- .append(",\"isDoneDataPub\":").append(isDoneDataPub)
.append(",\"isOverTLS\":").append(isOverTLS)
.append(",\"lastBrokerSyncTime\":").append(lastBrokerSyncTime)
.append(",\"maxConfLoadedTimeInMs\":").append(maxConfLoadedTimeInMs)
@@ -214,7 +214,12 @@ public class BrokerRunStatusInfo {
break;
case STEP_STATUS_WAIT_SUBSCRIBE: {
- execSyncDataToSub();
+ if (execSyncDataToSub()) {
+ execSyncDataToPub();
+ curStepStatus = StepStatus.STEP_STATUS_WAIT_PUBLISH;
+ nextStepOpTimeInMills =
+ System.currentTimeMillis() +
curStepStatus.getShortDelayDurIdnMs();
+ }
}
break;
@@ -251,7 +256,7 @@ public class BrokerRunStatusInfo {
resetStatusInfo();
curStepStatus = StepStatus.STEP_STATUS_LOAD_DATA;
nextStepOpTimeInMills =
- System.currentTimeMillis() +
curStepStatus.getDelayDurInMs();
+ System.currentTimeMillis() +
curStepStatus.getNormalDelayDurInMs();
}
}
break;
@@ -265,7 +270,7 @@ public class BrokerRunStatusInfo {
curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
}
nextStepOpTimeInMills =
- System.currentTimeMillis() +
curStepStatus.getDelayDurInMs();
+ System.currentTimeMillis() +
curStepStatus.getNormalDelayDurInMs();
}
}
}
@@ -279,7 +284,7 @@ public class BrokerRunStatusInfo {
curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
}
nextStepOpTimeInMills =
- System.currentTimeMillis() +
curStepStatus.getDelayDurInMs();
+ System.currentTimeMillis() +
curStepStatus.getNormalDelayDurInMs();
}
}
break;
@@ -287,11 +292,9 @@ public class BrokerRunStatusInfo {
case STEP_STATUS_WAIT_SYNC: {
if (isSynchronized) {
curStepStatus = StepStatus.STEP_STATUS_WAIT_SUBSCRIBE;
- } else {
- curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
+ nextStepOpTimeInMills =
+ System.currentTimeMillis() +
curStepStatus.getNormalDelayDurInMs();
}
- nextStepOpTimeInMills =
- System.currentTimeMillis() +
curStepStatus.getDelayDurInMs();
}
break;
@@ -299,7 +302,7 @@ public class BrokerRunStatusInfo {
if (isDoneDataSub && System.currentTimeMillis() >
nextStepOpTimeInMills) {
curStepStatus = StepStatus.STEP_STATUS_WAIT_PUBLISH;
nextStepOpTimeInMills =
- System.currentTimeMillis() +
curStepStatus.getDelayDurInMs();
+ System.currentTimeMillis() +
curStepStatus.getNormalDelayDurInMs();
}
}
break;
@@ -342,15 +345,16 @@ public class BrokerRunStatusInfo {
}
}
- private void execSyncDataToSub() {
+ private boolean execSyncDataToSub() {
if (isDoneDataSub) {
- return;
+ return true;
}
Tuple2<ManageStatus, Map<String, TopicInfo>> syncData =
- brokerSyncData.getBrokerPublishInfo(brokerInfo);
- brokerRunManager.updBrokerCsmConfInfo(brokerInfo.getBrokerId(),
- syncData.getF0(), syncData.getF1());
+ brokerSyncData.getBrokerPublishInfo();
+ boolean needFastSync = brokerRunManager.updBrokerCsmConfInfo(
+ brokerInfo.getBrokerId(), syncData.getF0(), syncData.getF1());
isDoneDataSub = true;
+ return needFastSync;
}
private void execSyncDataToPub() {
@@ -358,7 +362,7 @@ public class BrokerRunStatusInfo {
return;
}
Tuple2<ManageStatus, Map<String, TopicInfo>> syncData =
- brokerSyncData.getBrokerPublishInfo(brokerInfo);
+ brokerSyncData.getBrokerPublishInfo();
brokerRunManager.updBrokerPrdConfInfo(brokerInfo.getBrokerId(),
syncData.getF0(), syncData.getF1());
isDoneDataPub = true;
@@ -391,11 +395,8 @@ public class BrokerRunStatusInfo {
* @return true if need report data otherwise false
*/
private boolean needForceSyncData() {
- if (System.currentTimeMillis() - this.lastBrokerSyncTime
- > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION) {
- return true;
- }
- return false;
+ return System.currentTimeMillis() - this.lastBrokerSyncTime
+ > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION;
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index b320ed1..715ca46 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -34,7 +34,6 @@ import org.apache.tubemq.corebase.utils.CheckSum;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corebase.utils.Tuple4;
-import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ public class BrokerSyncData {
// current data push id
private long dataPushId;
// data need to sync
- private AtomicLong syncDownDataConfId =
+ private final AtomicLong syncDownDataConfId =
new AtomicLong(System.currentTimeMillis());
private int syncDownDataChkSumId;
private ManageStatus mngStatus;
@@ -61,18 +60,12 @@ public class BrokerSyncData {
private int syncUpDataChkSumId = TBaseConstants.META_VALUE_UNDEFINED;
private String syncUpBrokerConfInfo;
private List<String> syncUpTopicConfInfos = new ArrayList<>();
+ Map<String, TopicInfo> syncUpTopicInfoMap = new HashMap<>();
private long lastDataUpTime = 0;
- public BrokerSyncData() {
- }
+ public BrokerSyncData() {
- public BrokerSyncData(long dataPushId,
- ManageStatus mngStatus,
- String brokerConfInfo,
- Map<String, String> topicConfInfoMap) {
- updBrokerSyncData(true, dataPushId,
- mngStatus, brokerConfInfo, topicConfInfoMap);
}
/**
@@ -123,6 +116,7 @@ public class BrokerSyncData {
/**
* Book the report data by broker
+ * @param brokerInfo broker info
* @param syncDataConfId data configure id
* @param syncDataChkSumId data check-sum id
* @param isTakeData if carry the data info
@@ -131,7 +125,8 @@ public class BrokerSyncData {
*
* @return whether the broker data synchronized
*/
- public boolean bookBrokerReportInfo(long syncDataConfId, int
syncDataChkSumId,
+ public boolean bookBrokerReportInfo(BrokerInfo brokerInfo,
+ long syncDataConfId, int
syncDataChkSumId,
boolean isTakeData, String
syncBrokerConfInfo,
List<String> syncTopicConfInfos) {
this.syncUpDataConfId = syncDataConfId;
@@ -143,6 +138,12 @@ public class BrokerSyncData {
} else {
this.syncUpTopicConfInfos = syncTopicConfInfos;
}
+ Map<String, TopicInfo> tmpInfoMap = parseTopicInfoConf(brokerInfo);
+ if (tmpInfoMap == null) {
+ this.syncUpTopicInfoMap.clear();
+ } else {
+ this.syncUpTopicInfoMap = tmpInfoMap;
+ }
this.lastDataUpTime = System.currentTimeMillis();
}
return isConfSynchronized();
@@ -185,15 +186,12 @@ public class BrokerSyncData {
/**
* Get the broker publish info
- * @param brokerInfo broker info
* @return need sync data
* f0 : manage status
* f1 : topic configure
*/
- public Tuple2<ManageStatus, Map<String, TopicInfo>> getBrokerPublishInfo(
- final BrokerInfo brokerInfo) {
- Map<String, TopicInfo> topicInfoMap = parseTopicInfoConf(brokerInfo);
- return new Tuple2<>(mngStatus, topicInfoMap);
+ public Tuple2<ManageStatus, Map<String, TopicInfo>> getBrokerPublishInfo()
{
+ return new Tuple2<>(mngStatus, syncUpTopicInfoMap);
}
public long getDataPushId() {
@@ -231,6 +229,7 @@ public class BrokerSyncData {
.append(",\"syncUpDataChkSumId\":").append(syncUpDataChkSumId)
.append(",\"syncUpBrokerConfInfo\":\"").append(syncUpBrokerConfInfo)
.append("\",\"syncUpTopicConfInfos\":\"").append(syncUpTopicConfInfos.toString())
+
.append("\",\"syncUpTopicInfoMap\":\"").append(syncUpTopicInfoMap.toString())
.append("\",\"lastDataUpTime\":").append(lastDataUpTime)
.append("}");
return sBuffer;
@@ -258,12 +257,6 @@ public class BrokerSyncData {
numTopicStores = Integer.parseInt(brokerConfInfoAttrs[7]);
}
}
- int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
- if (brokerConfInfoAttrs.length > 8) {
- if (!TStringUtils.isBlank(brokerConfInfoAttrs[8])) {
- unFlushDataHold = Integer.parseInt(brokerConfInfoAttrs[8]);
- }
- }
Map<String, TopicInfo> topicInfoMap = new HashMap<>();
// make up topicInfo info according to broker default configure
for (String strTopicConfInfo : this.syncUpTopicConfInfos) {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index 121a82f..17226fb 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -30,15 +30,16 @@ import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.cluster.TopicInfo;
import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
-
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
public class BrokerTopicInfoView {
public AtomicLong topicChangeId = new AtomicLong(0);
- private ConcurrentHashMap<String/* topicName */,
+ private final ConcurrentHashMap<String/* topicName */,
ConcurrentHashMap<Integer/* brokerId */, TopicInfo>>
topicConfInfoMap =
new ConcurrentHashMap<>();
- private ConcurrentHashMap<Integer/* brokerId */,
ConcurrentHashSet<String/* topicName */>>
+ private final ConcurrentHashMap<Integer/* brokerId */,
ConcurrentHashSet<String/* topicName */>>
brokerIdIndexMap = new ConcurrentHashMap<>();
public BrokerTopicInfoView() {
@@ -65,7 +66,7 @@ public class BrokerTopicInfoView {
}
topicInfoView.remove(brokerId);
}
- topicChangeId.set(System.currentTimeMillis());
+ SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
}
/**
@@ -76,24 +77,39 @@ public class BrokerTopicInfoView {
* if topicInfoMap is null, reserve current configure;
* if topicInfoMap is empty, clear current configure.
*/
- public void updBrokerTopicConfInfo(int brokerId, Map<String, TopicInfo>
topicInfoMap) {
+ public void updBrokerTopicConfInfo(int brokerId,
+ Map<String, TopicInfo> topicInfoMap) {
if (topicInfoMap == null) {
return;
}
// get removed topic info
- Set<String> delTopicSet = new HashSet<>();
- ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
- if (curTopicSet != null) {
- for (String topic : curTopicSet) {
- if (!topicInfoMap.containsKey(topic)) {
- delTopicSet.add(topic);
- }
- }
- }
- rmvBrokerTopicInfo(brokerId, delTopicSet);
+ rmvBrokerTopicInfo(brokerId, topicInfoMap);
// add or update TopicInfo
repBrokerTopicInfo(brokerId, topicInfoMap);
- topicChangeId.set(System.currentTimeMillis());
+ SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
+ }
+
+ /**
+ * update broker's topicInfo configures
+ *
+ * @param brokerId broker id index
+ * @param topicInfoMap broker's topic configure info,
+ * if topicInfoMap is null, reserve current configure;
+ * if topicInfoMap is empty, clear current configure.
+ * @return if fast sync data
+ */
+ public boolean fastUpdBrokerTopicConfInfo(int brokerId,
+ Map<String, TopicInfo>
topicInfoMap) {
+ if (topicInfoMap == null) {
+ return true;
+ }
+ // get removed topic info
+ rmvBrokerTopicInfo(brokerId, topicInfoMap);
+ // update TopicInfo and judge if fast update
+ Tuple2<Boolean, Boolean> retTuple =
+ updBrokerTopicInfo(brokerId, topicInfoMap);
+ SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
+ return retTuple.getF1();
}
/**
@@ -152,7 +168,6 @@ public class BrokerTopicInfoView {
* @param topic need query topic set
*/
public List<Partition> getAcceptSubParts(String topic, Set<Integer>
enableSubBrokerIdSet) {
- Partition tmpPart;
TopicInfo topicInfo;
List<Partition> partList = new ArrayList<>();
if (topic == null) {
@@ -287,25 +302,40 @@ public class BrokerTopicInfoView {
}
// remove broker special topic info
- private void rmvBrokerTopicInfo(int brokerId,
- Set<String> delTopicSet) {
- if (delTopicSet == null || delTopicSet.isEmpty()) {
- return;
+ private boolean rmvBrokerTopicInfo(int brokerId,
+ Map<String, TopicInfo> topicInfoMap) {
+ boolean changed = false;
+ Set<String> delTopicSet = new HashSet<>();
+ ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
+ if (curTopicSet != null) {
+ for (String topic : curTopicSet) {
+ if (!topicInfoMap.containsKey(topic)) {
+ delTopicSet.add(topic);
+ }
+ }
+ }
+ if (delTopicSet.isEmpty()) {
+ return false;
}
ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
if (topicSet == null || topicSet.isEmpty()) {
- return;
+ return changed;
}
for (String topic : delTopicSet) {
- topicSet.remove(topic);
+ if (topicSet.remove(topic)) {
+ changed = true;
+ }
topicInfoView = topicConfInfoMap.get(topic);
if ((topicInfoView == null)
|| topicInfoView.isEmpty()) {
continue;
}
- topicInfoView.remove(brokerId);
+ if (topicInfoView.remove(brokerId) != null) {
+ changed = true;
+ }
}
+ return changed;
}
// add or update broker special topic info
@@ -344,4 +374,39 @@ public class BrokerTopicInfoView {
curTopicSet.addAll(topicInfoMap.keySet());
}
+ // update current broker special topic info
+ private Tuple2<Boolean, Boolean> updBrokerTopicInfo(int brokerId,
+ Map<String, TopicInfo>
topicInfoMap) {
+ boolean isChanged = false;
+ boolean isFastSync = true;
+ if (topicInfoMap == null || topicInfoMap.isEmpty()) {
+ return new Tuple2<>(isChanged, isFastSync);
+ }
+ Tuple2<Boolean, Boolean> retResult;
+ ConcurrentHashMap<Integer, TopicInfo> curTopicInfoView;
+ for (TopicInfo newTopicInfo : topicInfoMap.values()) {
+ if (newTopicInfo == null) {
+ continue;
+ }
+ curTopicInfoView = topicConfInfoMap.get(newTopicInfo.getTopic());
+ if (curTopicInfoView == null) {
+ isFastSync = false;
+ continue;
+ }
+ TopicInfo curTopicInfo = curTopicInfoView.get(brokerId);
+ if (curTopicInfo == null) {
+ isFastSync = false;
+ continue;
+ }
+ retResult = curTopicInfo.updAndJudgeTopicInfo(newTopicInfo);
+ if (retResult.getF0() && !isChanged) {
+ isChanged = true;
+ }
+ if (retResult.getF1() && isFastSync) {
+ isFastSync = false;
+ }
+ }
+ return new Tuple2<>(isChanged, isFastSync);
+ }
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index dbd068e..379c795 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.cluster.BrokerInfo;
@@ -40,6 +39,7 @@ import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
import org.apache.tubemq.server.master.MasterConfig;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.metamanage.MetaDataManager;
@@ -56,21 +56,23 @@ public class DefBrokerRunManager implements
BrokerRunManager {
private final MetaDataManager metaDataManager;
private final HeartbeatManager heartbeatManager;
// broker string info
- private AtomicLong brokerInfoCheckSum = new
AtomicLong(System.currentTimeMillis());
+ private final AtomicLong brokerInfoCheckSum =
+ new AtomicLong(System.currentTimeMillis());
private long lastBrokerUpdatedTime = System.currentTimeMillis();
private final ConcurrentHashMap<Integer, String> brokersMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, String> brokersTLSMap =
new ConcurrentHashMap<>();
-
// broker sync FSM
- private AtomicInteger brokerTotalCount = new AtomicInteger(0);
- private ConcurrentHashMap<Integer/* brokerId */, BrokerRunStatusInfo>
brokerRunSyncManageMap =
+ private final AtomicInteger brokerTotalCount =
+ new AtomicInteger(0);
+ // brokerId -- broker run status info map
+ private final ConcurrentHashMap<Integer, BrokerRunStatusInfo>
brokerRunSyncManageMap =
new ConcurrentHashMap<>();
// broker abnormal holder
private final BrokerAbnHolder brokerAbnHolder;
// broker topic configure for consumer and producer
- private BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+ private final BrokerPSInfoHolder brokerPubSubInfo = new
BrokerPSInfoHolder();
public DefBrokerRunManager(TMaster tMaster) {
@@ -134,7 +136,7 @@ public class DefBrokerRunManager implements
BrokerRunManager {
&& !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
this.brokersTLSMap.put(entity.getBrokerId(),
entity.getSimpleTLSBrokerInfo());
}
- this.brokerInfoCheckSum.set(System.currentTimeMillis());
+ SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
}
}
@@ -146,7 +148,7 @@ public class DefBrokerRunManager implements
BrokerRunManager {
String brokerReg = this.brokersMap.remove(brokerId);
String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
if (brokerReg != null || brokerTLSReg != null) {
- this.brokerInfoCheckSum.set(System.currentTimeMillis());
+ SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
}
}
@@ -374,8 +376,10 @@ public class DefBrokerRunManager implements
BrokerRunManager {
builder.setStopWrite(autoFbdTuple.getF0());
builder.setStopRead(autoFbdTuple.getF1());
if (retTuple.getF2() == null) {
+ builder.setNeedReportData(false);
builder.setTakeConfInfo(false);
} else {
+ builder.setNeedReportData(true);
builder.setTakeConfInfo(true);
builder.setBrokerDefaultConfInfo(retTuple.getF2());
builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
@@ -453,7 +457,7 @@ public class DefBrokerRunManager implements
BrokerRunManager {
}
brokerTotalCount.decrementAndGet();
brokerAbnHolder.removeBroker(brokerId);
- brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId, isTimeout);
+ brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
logger.info(sBuffer.append("[Broker Release]
brokerId=").append(brokerId)
.append(", isTimeout=").append(isTimeout)
.append(", release success!").toString());
@@ -461,17 +465,15 @@ public class DefBrokerRunManager implements
BrokerRunManager {
}
@Override
- public void updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
- Map<String, TopicInfo> topicInfoMap) {
+ public boolean updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
+ Map<String, TopicInfo> topicInfoMap) {
brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
- brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
-
+ return brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId,
topicInfoMap);
}
@Override
public void updBrokerPrdConfInfo(int brokerId, ManageStatus mngStatus,
Map<String, TopicInfo> topicInfoMap) {
- brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
brokerPubSubInfo.updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index cc8145d..97eec4c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -509,6 +509,12 @@ public class WebTopicDeployHandler extends
AbstractWebHandler {
enableAuthCtrl = false;
TopicCtrlEntity ctrlEntity =
metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+ if (ctrlEntity == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
if (ctrlEntity != null) {
maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
@@ -645,6 +651,12 @@ public class WebTopicDeployHandler extends
AbstractWebHandler {
isAcceptSubscribe = false;
TopicCtrlEntity ctrlEntity =
metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+ if (ctrlEntity == null) {
+ continue;
+ }
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
ctrlEntity.toWebJsonStr(sBuffer, true, false);
sBuffer.append(",\"deployInfo\":[");
int brokerCount = 0;
@@ -657,6 +669,7 @@ public class WebTopicDeployHandler extends
AbstractWebHandler {
sBuffer.append(",\"runInfo\":{");
BrokerConfEntity brokerConfEntity =
metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+
String strManageStatus = "-";
if (brokerConfEntity != null) {
manageStatus = brokerConfEntity.getManageStatus();
diff --git
a/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
index 2d13258..71d76d4 100644
---
a/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
+++
b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
@@ -254,16 +254,45 @@ public class WebParameterUtilsTest {
public void getTopicPropInfoTest() {
boolean retValue;
TopicPropGroup retEntry;
- TopicPropGroup defOpEntity = new TopicPropGroup();
+ TopicPropGroup defOpEntity;
StringBuilder sBuffer = new StringBuilder(512);
ProcessResult result = new ProcessResult();
Map<String, String> paramCntrMap = new HashMap<>();
// case 1
+ paramCntrMap.put(WebFieldDef.NUMTOPICSTORES.name, "1");
+ paramCntrMap.put(WebFieldDef.NUMPARTITIONS.name, "2");
+ paramCntrMap.put(WebFieldDef.UNFLUSHTHRESHOLD.name, "3");
+ paramCntrMap.put(WebFieldDef.UNFLUSHINTERVAL.name, "4");
+ paramCntrMap.put(WebFieldDef.UNFLUSHDATAHOLD.name, "5");
+ paramCntrMap.put(WebFieldDef.MCACHESIZEINMB.name, "2");
+ paramCntrMap.put(WebFieldDef.UNFMCACHECNTINK.name, "7");
+ paramCntrMap.put(WebFieldDef.UNFMCACHEINTERVAL.name, "4000");
+ paramCntrMap.put(WebFieldDef.ACCEPTPUBLISH.name, "true");
+ paramCntrMap.put(WebFieldDef.ACCEPTSUBSCRIBE.name, "false");
+ paramCntrMap.put(WebFieldDef.DATASTORETYPE.name, "9");
+ paramCntrMap.put(WebFieldDef.DATAPATH.name, "test");
+ paramCntrMap.put(WebFieldDef.DELETEPOLICY.name, "delete,2h");
retValue = WebParameterUtils.getTopicPropInfo(paramCntrMap,
null, sBuffer, result);
Assert.assertTrue(retValue);
Assert.assertTrue(result.isSuccess());
retEntry = (TopicPropGroup) result.getRetData();
+ Assert.assertEquals(retEntry.getNumTopicStores(),
+
Integer.parseInt(paramCntrMap.get(WebFieldDef.NUMTOPICSTORES.name)));
+ // case 2
+ paramCntrMap.clear();
+ defOpEntity = new TopicPropGroup();
+ defOpEntity.fillDefaultValue();
+ paramCntrMap.put(WebFieldDef.ACCEPTPUBLISH.name, "0");
+ paramCntrMap.put(WebFieldDef.NUMTOPICSTORES.name, "9");
+ paramCntrMap.put(WebFieldDef.UNFMCACHECNTINK.name, "100");
+ retValue = WebParameterUtils.getTopicPropInfo(paramCntrMap,
+ defOpEntity, sBuffer, result);
+ Assert.assertTrue(retValue);
+ Assert.assertTrue(result.isSuccess());
+ retEntry = (TopicPropGroup) result.getRetData();
+
+
}
}