This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
     new f24ea23  [INLONG-626] Fix broker and topic confiugre implement bugs 
(#479)
f24ea23 is described below

commit f24ea23b23ddb028e8b4dafe7fc550c7229b3cba
Author: gosonzhang <[email protected]>
AuthorDate: Tue Jun 8 14:03:10 2021 +0800

    [INLONG-626] Fix broker and topic confiugre implement bugs (#479)
    
    * [INLONG-626] Fix broker and topic confiugre implement bugs
    
    * [INLONG-626] Fix broker and topic confiugre implement bugs
    
    * [INLONG-626] Fix broker and topic confiugre implement bugs
    
    * [INLONG-626] Fix broker and topic confiugre implement bugs
    
    * [INLONG-626] Fix broker and topic confiugre implement bugs
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../apache/tubemq/corebase/cluster/TopicInfo.java  |  16 +++
 .../server/broker/msgstore/MessageStore.java       |   8 +-
 .../server/common/heartbeat/HeartbeatManager.java  |   2 +-
 .../tubemq/server/common/statusdef/StepStatus.java |  30 ++++--
 .../tubemq/server/common/utils/SerialIdUtils.java  |  39 +++++++
 .../server/common/utils/WebParameterUtils.java     |   2 +-
 .../server/master/metamanage/MetaDataManager.java  |  35 ++++---
 .../metastore/dao/entity/BaseEntity.java           |  11 +-
 .../metastore/dao/entity/BrokerConfEntity.java     |   5 +-
 .../metastore/dao/entity/ClusterSettingEntity.java |   5 +-
 .../metastore/dao/entity/TopicDeployEntity.java    |   5 +-
 .../metastore/dao/entity/TopicPropGroup.java       |  12 ---
 .../nodemanage/nodebroker/BrokerAbnHolder.java     |   6 +-
 .../nodemanage/nodebroker/BrokerPSInfoHolder.java  |  45 +++-----
 .../nodemanage/nodebroker/BrokerRunManager.java    |   6 +-
 .../nodemanage/nodebroker/BrokerRunStatusInfo.java |  53 +++++-----
 .../nodemanage/nodebroker/BrokerSyncData.java      |  37 +++----
 .../nodemanage/nodebroker/BrokerTopicInfoView.java | 113 ++++++++++++++++-----
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  30 +++---
 .../master/web/handler/WebTopicDeployHandler.java  |  13 +++
 .../server/common/WebParameterUtilsTest.java       |  31 +++++-
 21 files changed, 320 insertions(+), 184 deletions(-)

diff --git 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
index 2e4e075..43400b7 100644
--- 
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
+++ 
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/TopicInfo.java
@@ -19,6 +19,7 @@ package org.apache.tubemq.corebase.cluster;
 
 import java.io.Serializable;
 import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.Tuple2;
 
 
 public class TopicInfo implements Serializable {
@@ -74,6 +75,21 @@ public class TopicInfo implements Serializable {
         this.acceptSubscribe = acceptSubscribe;
     }
 
+    // return result <isChanged, isScaleOut>
+    public Tuple2<Boolean, Boolean> updAndJudgeTopicInfo(TopicInfo 
newTopicInfo) {
+        boolean isChanged = false;
+        if (this.acceptPublish != newTopicInfo.acceptPublish) {
+            isChanged = true;
+            this.acceptPublish = newTopicInfo.acceptPublish;
+        }
+        if (this.acceptSubscribe != newTopicInfo.acceptSubscribe) {
+            isChanged = true;
+            this.acceptSubscribe = newTopicInfo.acceptSubscribe;
+        }
+        return new Tuple2<>(isChanged, (this.partitionNum != 
newTopicInfo.partitionNum
+                || this.topicStoreNum != newTopicInfo.topicStoreNum));
+    }
+
     public int getTopicStoreNum() {
         return topicStoreNum;
     }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index a025f7c..8a88e6f 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -38,7 +38,6 @@ import 
org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.MixedUtils;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
 import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.tubemq.server.broker.msgstore.disk.MsgFileStatisInfo;
@@ -603,12 +602,7 @@ public class MessageStore implements Closeable {
 
     private int validAndGetMemCacheSize(TopicMetadata topicMetadata) {
         int memCacheSize = topicMetadata.getMemCacheMsgSize();
-        if (memCacheSize <= topicMetadata.getMinMemCacheSize()) {
-            logger.info(new StringBuilder(512)
-                    .append("[Data Store] ").append(getTopic())
-                    .append(" writeCacheMaxSize changed, from ")
-                    .append(memCacheSize).append(" to ")
-                    
.append(ClusterConfigHolder.getMinMemCacheSize()).toString());
+        if (memCacheSize < topicMetadata.getMinMemCacheSize()) {
             memCacheSize = topicMetadata.getMinMemCacheSize();
         }
         return memCacheSize;
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
index dc83412..93f4c7b 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/heartbeat/HeartbeatManager.java
@@ -263,7 +263,7 @@ public class HeartbeatManager {
             sBuffer.delete(0, sBuffer.length());
             return result.isSuccess();
         }
-        if (createId.equals(timeoutInfo.getSecondKey())) {
+        if (!createId.equals(timeoutInfo.getSecondKey())) {
             result.setFailResult(TErrCodeConstants.HB_NO_NODE,
                     sBuffer.append("Invalid node block id:").append(nodeId)
                             .append(", you have to append node 
first!").toString());
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
index cfa4038..5903fc6 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/StepStatus.java
@@ -20,23 +20,27 @@ package org.apache.tubemq.server.common.statusdef;
 
 public enum StepStatus {
 
-    STEP_STATUS_UNDEFINED(-2, "idle", 0),
-    STEP_STATUS_LOAD_DATA(1, "load_data", 0),
-    STEP_STATUS_WAIT_ONLINE(2, "wait_online", 0),
-    STEP_STATUS_WAIT_SYNC(3, "wait_sync", 0),
-    STEP_STATUS_WAIT_SUBSCRIBE(4, "wait_sub", 60000),
-    STEP_STATUS_WAIT_PUBLISH(5, "wait_pub", 30000);
+    STEP_STATUS_UNDEFINED(-2, "idle", 0, 0),
+    STEP_STATUS_LOAD_DATA(1, "load_data", 0, 0),
+    STEP_STATUS_WAIT_ONLINE(2, "wait_online", 0, 0),
+    STEP_STATUS_WAIT_SYNC(3, "wait_sync", 0, 0),
+    STEP_STATUS_WAIT_SUBSCRIBE(4, "wait_sub", 55000, 40000),
+    STEP_STATUS_WAIT_PUBLISH(5, "wait_pub", 25000, 10000);
 
     private int code;
     private String description;
-    private long delayDurInMs;
+    private long normalDelayDurIdnMs;
+    private long shortDelayDurIdnMs;
 
 
 
-    StepStatus(int code, String description, long delayDurInMs) {
+
+    StepStatus(int code, String description,
+               long normalDelayDurIdnMs, long shortDelayDurIdnMs) {
         this.code = code;
         this.description = description;
-        this.delayDurInMs = delayDurInMs;
+        this.normalDelayDurIdnMs = normalDelayDurIdnMs;
+        this.shortDelayDurIdnMs = shortDelayDurIdnMs;
     }
 
     public int getCode() {
@@ -47,8 +51,12 @@ public enum StepStatus {
         return description;
     }
 
-    public long getDelayDurInMs() {
-        return delayDurInMs;
+    public long getNormalDelayDurInMs() {
+        return normalDelayDurIdnMs;
+    }
+
+    public long getShortDelayDurIdnMs() {
+        return shortDelayDurIdnMs;
     }
 
     public static StepStatus valueOf(int code) {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
new file mode 100644
index 0000000..7f771f3
--- /dev/null
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/SerialIdUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.common.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+
+public class SerialIdUtils {
+
+    public static void updTimeStampSerialIdValue(final AtomicLong serialId) {
+        long curSerialId = serialId.get();
+        long newSerialId = System.currentTimeMillis();
+        do {
+            if (newSerialId > curSerialId) {
+                if (serialId.compareAndSet(curSerialId, newSerialId)) {
+                    break;
+                }
+            }
+            curSerialId = serialId.get();
+            newSerialId = curSerialId + 10;
+        } while (true);
+    }
+}
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
index 52b7dbc..81d7a2a 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/utils/WebParameterUtils.java
@@ -388,7 +388,7 @@ public class WebParameterUtils {
         }
         newConf.setUnflushInterval((int) result.retData1);
         // get unflushDataHold parameter value
-        if (!WebParameterUtils.getIntParamValue(paramCntr, 
WebFieldDef.UNFLUSHINTERVAL, false,
+        if (!WebParameterUtils.getIntParamValue(paramCntr, 
WebFieldDef.UNFLUSHDATAHOLD, false,
                 (defVal == null ? TBaseConstants.META_VALUE_UNDEFINED : 
defVal.getUnflushDataHold()),
                 TServerConstants.TOPIC_DSK_UNFLUSHDATAHOLD_MIN, sBuffer, 
result)) {
             return result.isSuccess();
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
index e6ee213..8fc6683 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/MetaDataManager.java
@@ -839,7 +839,7 @@ public class MetaDataManager implements Server {
             if (topicEntity != null
                     && topicEntity.getTopicStatus() == 
TopicStatus.STATUS_TOPIC_SOFT_REMOVE) {
                 confDelTopicConfInfo(topicEntity.getModifyUser(),
-                        topicEntity.getRecordKey(), strBuffer, result);
+                        topicEntity.getRecordKey(), topicEntity.getBrokerId(), 
strBuffer, result);
             }
         }
         result.setSuccResult(null);
@@ -874,7 +874,8 @@ public class MetaDataManager implements Server {
             if (topicEntity == null) {
                 continue;
             }
-            confDelTopicConfInfo(operator, topicEntity.getRecordKey(), 
strBuffer, result);
+            confDelTopicConfInfo(operator, topicEntity.getRecordKey(),
+                    topicEntity.getBrokerId(), strBuffer, result);
         }
         result.setSuccResult(null);
         return result.isSuccess();
@@ -949,7 +950,6 @@ public class MetaDataManager implements Server {
         }
         TopicDeployEntity deployConf =
                 new TopicDeployEntity(opEntity, brokerId, topicName);
-        deployConf.setTopicProps(brokerConf.getTopicProps());
         deployConf.updModifyInfo(opEntity.getDataVerId(),
                 TBaseConstants.META_VALUE_UNDEFINED, 
brokerConf.getBrokerPort(),
                 brokerConf.getBrokerIp(), deployStatus, newProps);
@@ -982,7 +982,9 @@ public class MetaDataManager implements Server {
                 
metaStoreService.getTopicConfByeRecKey(deployEntity.getRecordKey());
         if (isAddOp) {
             if (curEntity == null) {
-                metaStoreService.addTopicConf(deployEntity, sBuffer, result);
+                if (metaStoreService.addTopicConf(deployEntity, sBuffer, 
result)) {
+                    triggerBrokerConfDataSync(deployEntity.getBrokerId(), 
sBuffer, result);
+                }
             } else {
                 if (curEntity.isValidTopicStatus()) {
                     result.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(),
@@ -1071,7 +1073,9 @@ public class MetaDataManager implements Server {
                                 .append(curEntity.getTopicName()).toString());
                 sBuffer.delete(0, sBuffer.length());
             } else {
-                metaStoreService.updTopicConf(newEntity, sBuffer, result);
+                if (metaStoreService.updTopicConf(newEntity, sBuffer, result)) 
{
+                    triggerBrokerConfDataSync(deployEntity.getBrokerId(), 
sBuffer, result);
+                }
             }
             return new TopicProcessResult(deployEntity.getBrokerId(),
                     deployEntity.getTopicName(), result);
@@ -1152,7 +1156,9 @@ public class MetaDataManager implements Server {
         if (newEntity.updModifyInfo(opEntity.getDataVerId(),
                 curEntity.getTopicId(), brokerConf.getBrokerPort(),
                 brokerConf.getBrokerIp(), topicStatus, null)) {
-            metaStoreService.updTopicConf(newEntity, sBuffer, result);
+            if (metaStoreService.updTopicConf(newEntity, sBuffer, result)) {
+                triggerBrokerConfDataSync(newEntity.getBrokerId(), sBuffer, 
result);
+            }
         } else {
             result.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(),
                     sBuffer.append("Data not changed for brokerId=")
@@ -1196,15 +1202,15 @@ public class MetaDataManager implements Server {
         return metaStoreService.getTopicDepInfoByTopicBrokerId(topicNameSet, 
brokerIdSet);
     }
 
-
-
-
     private boolean confDelTopicConfInfo(String operator,
                                          String recordKey,
-                                         StringBuilder strBuffer,
+                                         int brokerId,
+                                         StringBuilder sBuffer,
                                          ProcessResult result) {
-        return metaStoreService.delTopicConf(operator,
-                recordKey, strBuffer, result);
+        if (metaStoreService.delTopicConf(operator, recordKey, sBuffer, 
result)) {
+            return triggerBrokerConfDataSync(brokerId, sBuffer, result);
+        }
+        return false;
     }
 
     public Map<String, String> getBrokerTopicStrConfigInfo(
@@ -1227,8 +1233,7 @@ public class MetaDataManager implements Server {
             return topicConfStrMap;
         }
         TopicPropGroup defTopicProps = brokerEntity.getTopicProps();
-        ClusterSettingEntity clusterDefConf =
-                metaStoreService.getClusterConfig();
+        ClusterSettingEntity clusterDefConf = getClusterDefSetting(false);
         int defMsgSizeInB = clusterDefConf.getMaxMsgSizeInB();
         for (TopicDeployEntity topicEntity : topicEntityMap.values()) {
             /*
@@ -1464,7 +1469,7 @@ public class MetaDataManager implements Server {
     public boolean addIfAbsentTopicCtrlConf(String topicName, String operator,
                                             StringBuilder sBuffer, 
ProcessResult result) {
         int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB;
-        ClusterSettingEntity defSetting = metaStoreService.getClusterConfig();
+        ClusterSettingEntity defSetting = getClusterDefSetting(false);
         if (defSetting != null) {
             maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
         }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
index 2601083..42592b1 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
 import org.apache.tubemq.server.common.utils.WebParameterUtils;
 
 
@@ -176,15 +177,7 @@ public class BaseEntity implements Serializable, Cloneable 
{
     }
 
     protected void updSerialId() {
-        long curSerialId;
-        long newSerialId;
-        do {
-            curSerialId = this.serialId.get();
-            newSerialId = System.currentTimeMillis();
-            if (newSerialId == curSerialId) {
-                newSerialId++;
-            }
-        } while (!this.serialId.compareAndSet(curSerialId, newSerialId));
+        SerialIdUtils.updTimeStampSerialIdValue(this.serialId);
     }
 
     public String getModifyUser() {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 7f49e68..90fb38d 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -285,8 +285,9 @@ public class BrokerConfEntity extends BaseEntity implements 
Cloneable {
         // check and set topicProps info
         if (topicProps != null
                 && !topicProps.isDataEquals(this.topicProps)) {
-            changed = true;
-            this.topicProps = topicProps;
+            if (this.topicProps.updModifyInfo(topicProps)) {
+                changed = true;
+            }
         }
         if (changed) {
             updSerialId();
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
index 39a889d..f09f419 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/ClusterSettingEntity.java
@@ -194,8 +194,9 @@ public class ClusterSettingEntity extends BaseEntity 
implements Cloneable {
         // check and set clsDefTopicProps info
         if (defTopicProps != null
                 && !defTopicProps.isDataEquals(clsDefTopicProps)) {
-            changed = true;
-            clsDefTopicProps = defTopicProps;
+            if (clsDefTopicProps.updModifyInfo(defTopicProps)) {
+                changed = true;
+            }
         }
         if (changed) {
             updSerialId();
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index c110c0a..3096c1b 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -233,8 +233,9 @@ public class TopicDeployEntity extends BaseEntity 
implements Cloneable {
         // check and set topicProps info
         if (topicProps != null
                 && !topicProps.isDataEquals(this.topicProps)) {
-            changed = true;
-            this.topicProps = topicProps;
+            if (this.topicProps.updModifyInfo(topicProps)) {
+                changed = true;
+            }
         }
         if (changed) {
             updSerialId();
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
index 180ac8f..77042d6 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java
@@ -294,18 +294,6 @@ public class TopicPropGroup implements Serializable, 
Cloneable {
     }
 
     /**
-     * check TopicPropGroup's partition or storeblock changed
-     * @return if changed
-     */
-    public boolean isPartOrStoreChanged(TopicPropGroup other) {
-        if (this.numPartitions != other.numPartitions
-                || this.numTopicStores != other.numTopicStores) {
-            return true;
-        }
-        return false;
-    }
-
-    /**
      * check if subclass fields is equals
      *
      * @param other  check object
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 51f886d..e1e7590 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -46,7 +46,8 @@ public class BrokerAbnHolder {
             new ConcurrentHashMap<>();
     private final int maxAutoForbiddenCnt;
     private final MetaDataManager metaDataManager;
-    private AtomicInteger brokerForbiddenCount = new AtomicInteger(0);
+    private final AtomicInteger brokerForbiddenCount =
+            new AtomicInteger(0);
 
 
     public BrokerAbnHolder(final int maxAutoForbiddenCnt,
@@ -154,8 +155,7 @@ public class BrokerAbnHolder {
     /**
      * Remove broker info and decrease total broker count and forbidden broker 
count
      *
-     * @param brokerId
-     * @return the deleted broker info
+     * @param brokerId the deleted broker id
      */
     public void removeBroker(Integer brokerId) {
         brokerAbnormalMap.remove(brokerId);
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
index dcdbdb7..a225430 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerPSInfoHolder.java
@@ -29,12 +29,14 @@ import 
org.apache.tubemq.server.common.statusdef.ManageStatus;
 
 public class BrokerPSInfoHolder {
     // broker manage status
-    private ConcurrentHashSet<Integer/* brokerId */> enablePubBrokerIdSet = 
new ConcurrentHashSet<>();
-    private ConcurrentHashSet<Integer/* brokerId */> enableSubBrokerIdSet = 
new ConcurrentHashSet<>();
+    private final ConcurrentHashSet<Integer/* brokerId */> 
enablePubBrokerIdSet =
+            new ConcurrentHashSet<>();
+    private final ConcurrentHashSet<Integer/* brokerId */> 
enableSubBrokerIdSet =
+            new ConcurrentHashSet<>();
     // broker subscribe topic view info
-    private BrokerTopicInfoView subTopicInfoView = new BrokerTopicInfoView();
+    private final BrokerTopicInfoView subTopicInfoView = new 
BrokerTopicInfoView();
     // broker publish topic view info
-    private BrokerTopicInfoView pubTopicInfoView = new BrokerTopicInfoView();
+    private final BrokerTopicInfoView pubTopicInfoView = new 
BrokerTopicInfoView();
 
 
     public BrokerPSInfoHolder() {
@@ -45,17 +47,14 @@ public class BrokerPSInfoHolder {
      * remove broker all configure info
      *
      * @param brokerId broker id index
-     * @param isTimeout if broker is timeout
      */
-    public void rmvBrokerAllPushedInfo(int brokerId, boolean isTimeout) {
+    public void rmvBrokerAllPushedInfo(int brokerId) {
         // remove broker status Info
         enablePubBrokerIdSet.remove(brokerId);
         enableSubBrokerIdSet.remove(brokerId);
-        if (!isTimeout) {
-            // remove broker topic info
-            subTopicInfoView.rmvBrokerTopicInfo(brokerId);
-            pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
-        }
+        // remove broker topic info
+        subTopicInfoView.rmvBrokerTopicInfo(brokerId);
+        pubTopicInfoView.rmvBrokerTopicInfo(brokerId);
     }
 
     /**
@@ -66,12 +65,12 @@ public class BrokerPSInfoHolder {
      */
     public void updBrokerMangeStatus(int brokerId, ManageStatus mngStatus) {
         Tuple2<Boolean, Boolean> pubSubStatus = mngStatus.getPubSubStatus();
-        if (pubSubStatus.getF0() == Boolean.TRUE) {
+        if (pubSubStatus.getF0()) {
             enablePubBrokerIdSet.add(brokerId);
         } else {
             enablePubBrokerIdSet.remove(brokerId);
         }
-        if (pubSubStatus.getF1() == Boolean.TRUE) {
+        if (pubSubStatus.getF1()) {
             enableSubBrokerIdSet.add(brokerId);
         } else {
             enableSubBrokerIdSet.remove(brokerId);
@@ -90,13 +89,15 @@ public class BrokerPSInfoHolder {
      * @param topicInfoMap broker's topic configure info,
      *                    if topicInfoMap is null, reserve current configure;
      *                    if topicInfoMap is empty, clear current configure.
+     * @return if fast sync data
      */
-    public void updBrokerSubTopicConfInfo(int brokerId,
+    public boolean updBrokerSubTopicConfInfo(int brokerId,
                                           Map<String, TopicInfo> topicInfoMap) 
{
         if (topicInfoMap == null) {
-            return;
+            return true;
         }
         subTopicInfoView.updBrokerTopicConfInfo(brokerId, topicInfoMap);
+        return pubTopicInfoView.fastUpdBrokerTopicConfInfo(brokerId, 
topicInfoMap);
     }
 
     /**
@@ -116,20 +117,6 @@ public class BrokerPSInfoHolder {
     }
 
     /**
-     * update broker manage status and topicInfo configures
-     *
-     * @param brokerId broker id index
-     * @param mngStatus broker's manage status
-     * @param topicInfoMap broker's topic configure info
-     */
-    public void updateBrokerPushedInfo(int brokerId, ManageStatus mngStatus,
-                                       Map<String, TopicInfo> topicInfoMap) {
-        updBrokerMangeStatus(brokerId, mngStatus);
-        updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
-        updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
-    }
-
-    /**
      * Get the maximum number of broker distributions of topic
      *
      * @param topicSet need query topic set
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
index 973435b..af24f78 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunManager.java
@@ -77,9 +77,9 @@ public interface BrokerRunManager {
 
     Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> brokerIds);
 
-    void updBrokerCsmConfInfo(int brokerId,
-                              ManageStatus mngStatus,
-                              Map<String, TopicInfo> topicInfoMap);
+    boolean updBrokerCsmConfInfo(int brokerId,
+                                 ManageStatus mngStatus,
+                                 Map<String, TopicInfo> topicInfoMap);
 
     void updBrokerPrdConfInfo(int brokerId,
                               ManageStatus mngStatus,
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
index 80ffd80..e928819 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerRunStatusInfo.java
@@ -50,14 +50,15 @@ public class BrokerRunStatusInfo {
     private BrokerInfo brokerInfo;
     private String createId;
     // config change flag
-    private AtomicBoolean isConfChanged = new AtomicBoolean(false);
-    private AtomicLong confChangeNo =
+    private final AtomicBoolean isConfChanged =
+            new AtomicBoolean(false);
+    private final AtomicLong confChangeNo =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     // data sync status
     private StepStatus curStepStatus;
     private volatile long nextStepOpTimeInMills = 0;
     // current loaded change No.
-    private AtomicLong confLoadedNo =
+    private final AtomicLong confLoadedNo =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     // broker sync data info
     BrokerSyncData brokerSyncData = new BrokerSyncData();
@@ -94,7 +95,7 @@ public class BrokerRunStatusInfo {
         this.brokerSyncData.updBrokerSyncData(true,
                 confChangeNo.get(), mngStatus, brokerConfInfo, 
topicConfInfoMap);
         this.curStepStatus = StepStatus.STEP_STATUS_WAIT_ONLINE;
-        this.nextStepOpTimeInMills = curTime + curStepStatus.getDelayDurInMs();
+        this.nextStepOpTimeInMills = curTime + 
curStepStatus.getNormalDelayDurInMs();
     }
 
     public void notifyDataChanged() {
@@ -164,7 +165,7 @@ public class BrokerRunStatusInfo {
                                      List<String> repTopicConfs,
                                      StringBuilder sBuffer) {
         boolean isSynchronized =
-                brokerSyncData.bookBrokerReportInfo(repConfigId,
+                brokerSyncData.bookBrokerReportInfo(brokerInfo, repConfigId,
                         repCheckSumId, isTackData, repBrokerConfInfo, 
repTopicConfs);
         this.isOnline = isOnline;
         goNextStatus(isRegister, isSynchronized, sBuffer);
@@ -186,7 +187,6 @@ public class BrokerRunStatusInfo {
                 .append(",\"isDoneDataLoad\":").append(isDoneDataLoad)
                 .append(",\"isDoneDataSub\":").append(isDoneDataSub)
                 .append(",\"isDoneDataPub\":").append(isDoneDataPub)
-                .append(",\"isDoneDataPub\":").append(isDoneDataPub)
                 .append(",\"isOverTLS\":").append(isOverTLS)
                 .append(",\"lastBrokerSyncTime\":").append(lastBrokerSyncTime)
                 
.append(",\"maxConfLoadedTimeInMs\":").append(maxConfLoadedTimeInMs)
@@ -214,7 +214,12 @@ public class BrokerRunStatusInfo {
             break;
 
             case STEP_STATUS_WAIT_SUBSCRIBE: {
-                execSyncDataToSub();
+                if (execSyncDataToSub()) {
+                    execSyncDataToPub();
+                    curStepStatus = StepStatus.STEP_STATUS_WAIT_PUBLISH;
+                    nextStepOpTimeInMills =
+                            System.currentTimeMillis() + 
curStepStatus.getShortDelayDurIdnMs();
+                }
             }
             break;
 
@@ -251,7 +256,7 @@ public class BrokerRunStatusInfo {
                     resetStatusInfo();
                     curStepStatus = StepStatus.STEP_STATUS_LOAD_DATA;
                     nextStepOpTimeInMills =
-                            System.currentTimeMillis() + 
curStepStatus.getDelayDurInMs();
+                            System.currentTimeMillis() + 
curStepStatus.getNormalDelayDurInMs();
                 }
             }
             break;
@@ -265,7 +270,7 @@ public class BrokerRunStatusInfo {
                             curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
                         }
                         nextStepOpTimeInMills =
-                                System.currentTimeMillis() + 
curStepStatus.getDelayDurInMs();
+                                System.currentTimeMillis() + 
curStepStatus.getNormalDelayDurInMs();
                     }
                 }
             }
@@ -279,7 +284,7 @@ public class BrokerRunStatusInfo {
                         curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
                     }
                     nextStepOpTimeInMills =
-                            System.currentTimeMillis() + 
curStepStatus.getDelayDurInMs();
+                            System.currentTimeMillis() + 
curStepStatus.getNormalDelayDurInMs();
                 }
             }
             break;
@@ -287,11 +292,9 @@ public class BrokerRunStatusInfo {
             case STEP_STATUS_WAIT_SYNC: {
                 if (isSynchronized) {
                     curStepStatus = StepStatus.STEP_STATUS_WAIT_SUBSCRIBE;
-                } else {
-                    curStepStatus = StepStatus.STEP_STATUS_WAIT_SYNC;
+                    nextStepOpTimeInMills =
+                            System.currentTimeMillis() + 
curStepStatus.getNormalDelayDurInMs();
                 }
-                nextStepOpTimeInMills =
-                        System.currentTimeMillis() + 
curStepStatus.getDelayDurInMs();
             }
             break;
 
@@ -299,7 +302,7 @@ public class BrokerRunStatusInfo {
                 if (isDoneDataSub && System.currentTimeMillis() > 
nextStepOpTimeInMills) {
                     curStepStatus = StepStatus.STEP_STATUS_WAIT_PUBLISH;
                     nextStepOpTimeInMills =
-                            System.currentTimeMillis() + 
curStepStatus.getDelayDurInMs();
+                            System.currentTimeMillis() + 
curStepStatus.getNormalDelayDurInMs();
                 }
             }
             break;
@@ -342,15 +345,16 @@ public class BrokerRunStatusInfo {
         }
     }
 
-    private void execSyncDataToSub() {
+    private boolean execSyncDataToSub() {
         if (isDoneDataSub) {
-            return;
+            return true;
         }
         Tuple2<ManageStatus, Map<String, TopicInfo>> syncData =
-                brokerSyncData.getBrokerPublishInfo(brokerInfo);
-        brokerRunManager.updBrokerCsmConfInfo(brokerInfo.getBrokerId(),
-                syncData.getF0(), syncData.getF1());
+                brokerSyncData.getBrokerPublishInfo();
+        boolean needFastSync = brokerRunManager.updBrokerCsmConfInfo(
+                brokerInfo.getBrokerId(), syncData.getF0(), syncData.getF1());
         isDoneDataSub = true;
+        return needFastSync;
     }
 
     private void execSyncDataToPub() {
@@ -358,7 +362,7 @@ public class BrokerRunStatusInfo {
             return;
         }
         Tuple2<ManageStatus, Map<String, TopicInfo>> syncData =
-                brokerSyncData.getBrokerPublishInfo(brokerInfo);
+                brokerSyncData.getBrokerPublishInfo();
         brokerRunManager.updBrokerPrdConfInfo(brokerInfo.getBrokerId(),
                 syncData.getF0(), syncData.getF1());
         isDoneDataPub = true;
@@ -391,11 +395,8 @@ public class BrokerRunStatusInfo {
      * @return true if need report data otherwise false
      */
     private boolean needForceSyncData() {
-        if (System.currentTimeMillis() - this.lastBrokerSyncTime
-                > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION) {
-            return true;
-        }
-        return false;
+        return System.currentTimeMillis() - this.lastBrokerSyncTime
+                > TServerConstants.CFG_REPORT_DEFAULT_SYNC_DURATION;
     }
 
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
index b320ed1..715ca46 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerSyncData.java
@@ -34,7 +34,6 @@ import org.apache.tubemq.corebase.utils.CheckSum;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.corebase.utils.Tuple4;
-import org.apache.tubemq.server.common.TServerConstants;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +46,7 @@ public class BrokerSyncData {
     // current data push id
     private long dataPushId;
     // data need to sync
-    private AtomicLong syncDownDataConfId =
+    private final AtomicLong syncDownDataConfId =
             new AtomicLong(System.currentTimeMillis());
     private int syncDownDataChkSumId;
     private ManageStatus mngStatus;
@@ -61,18 +60,12 @@ public class BrokerSyncData {
     private int syncUpDataChkSumId = TBaseConstants.META_VALUE_UNDEFINED;
     private String syncUpBrokerConfInfo;
     private List<String> syncUpTopicConfInfos = new ArrayList<>();
+    Map<String, TopicInfo> syncUpTopicInfoMap = new HashMap<>();
     private long lastDataUpTime = 0;
 
-    public BrokerSyncData() {
 
-    }
+    public BrokerSyncData() {
 
-    public BrokerSyncData(long dataPushId,
-                          ManageStatus mngStatus,
-                          String brokerConfInfo,
-                          Map<String, String> topicConfInfoMap) {
-        updBrokerSyncData(true, dataPushId,
-                mngStatus, brokerConfInfo, topicConfInfoMap);
     }
 
     /**
@@ -123,6 +116,7 @@ public class BrokerSyncData {
 
     /**
      * Book the report data by broker
+     * @param brokerInfo      broker info
      * @param syncDataConfId   data configure id
      * @param syncDataChkSumId data check-sum id
      * @param isTakeData  if carry the data info
@@ -131,7 +125,8 @@ public class BrokerSyncData {
      *
      * @return whether the broker data synchronized
      */
-    public boolean bookBrokerReportInfo(long syncDataConfId, int 
syncDataChkSumId,
+    public boolean bookBrokerReportInfo(BrokerInfo brokerInfo,
+                                        long syncDataConfId, int 
syncDataChkSumId,
                                         boolean isTakeData, String 
syncBrokerConfInfo,
                                         List<String> syncTopicConfInfos) {
         this.syncUpDataConfId = syncDataConfId;
@@ -143,6 +138,12 @@ public class BrokerSyncData {
             } else {
                 this.syncUpTopicConfInfos = syncTopicConfInfos;
             }
+            Map<String, TopicInfo> tmpInfoMap = parseTopicInfoConf(brokerInfo);
+            if (tmpInfoMap == null) {
+                this.syncUpTopicInfoMap.clear();
+            } else {
+                this.syncUpTopicInfoMap = tmpInfoMap;
+            }
             this.lastDataUpTime = System.currentTimeMillis();
         }
         return isConfSynchronized();
@@ -185,15 +186,12 @@ public class BrokerSyncData {
 
     /**
      * Get the broker publish info
-     * @param brokerInfo broker info
      * @return need sync data
      *         f0 : manage status
      *         f1 : topic configure
      */
-    public Tuple2<ManageStatus, Map<String, TopicInfo>> getBrokerPublishInfo(
-            final BrokerInfo brokerInfo) {
-        Map<String, TopicInfo> topicInfoMap = parseTopicInfoConf(brokerInfo);
-        return new Tuple2<>(mngStatus, topicInfoMap);
+    public Tuple2<ManageStatus, Map<String, TopicInfo>> getBrokerPublishInfo() 
{
+        return new Tuple2<>(mngStatus, syncUpTopicInfoMap);
     }
 
     public long getDataPushId() {
@@ -231,6 +229,7 @@ public class BrokerSyncData {
                 .append(",\"syncUpDataChkSumId\":").append(syncUpDataChkSumId)
                 
.append(",\"syncUpBrokerConfInfo\":\"").append(syncUpBrokerConfInfo)
                 
.append("\",\"syncUpTopicConfInfos\":\"").append(syncUpTopicConfInfos.toString())
+                
.append("\",\"syncUpTopicInfoMap\":\"").append(syncUpTopicInfoMap.toString())
                 .append("\",\"lastDataUpTime\":").append(lastDataUpTime)
                 .append("}");
         return sBuffer;
@@ -258,12 +257,6 @@ public class BrokerSyncData {
                 numTopicStores = Integer.parseInt(brokerConfInfoAttrs[7]);
             }
         }
-        int unFlushDataHold = TServerConstants.CFG_DEFAULT_DATA_UNFLUSH_HOLD;
-        if (brokerConfInfoAttrs.length > 8) {
-            if (!TStringUtils.isBlank(brokerConfInfoAttrs[8])) {
-                unFlushDataHold = Integer.parseInt(brokerConfInfoAttrs[8]);
-            }
-        }
         Map<String, TopicInfo> topicInfoMap = new HashMap<>();
         // make up topicInfo info according to broker default configure
         for (String strTopicConfInfo : this.syncUpTopicConfInfos) {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
index 121a82f..17226fb 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerTopicInfoView.java
@@ -30,15 +30,16 @@ import org.apache.tubemq.corebase.TokenConstants;
 import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.cluster.TopicInfo;
 import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
-
+import org.apache.tubemq.corebase.utils.Tuple2;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
 
 
 public class BrokerTopicInfoView {
     public AtomicLong topicChangeId = new AtomicLong(0);
-    private ConcurrentHashMap<String/* topicName */,
+    private final ConcurrentHashMap<String/* topicName */,
             ConcurrentHashMap<Integer/* brokerId */, TopicInfo>> 
topicConfInfoMap =
             new ConcurrentHashMap<>();
-    private ConcurrentHashMap<Integer/* brokerId */, 
ConcurrentHashSet<String/* topicName */>>
+    private final ConcurrentHashMap<Integer/* brokerId */, 
ConcurrentHashSet<String/* topicName */>>
             brokerIdIndexMap = new ConcurrentHashMap<>();
 
     public BrokerTopicInfoView() {
@@ -65,7 +66,7 @@ public class BrokerTopicInfoView {
             }
             topicInfoView.remove(brokerId);
         }
-        topicChangeId.set(System.currentTimeMillis());
+        SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
     }
 
     /**
@@ -76,24 +77,39 @@ public class BrokerTopicInfoView {
      *                    if topicInfoMap is null, reserve current configure;
      *                    if topicInfoMap is empty, clear current configure.
      */
-    public void updBrokerTopicConfInfo(int brokerId, Map<String, TopicInfo> 
topicInfoMap) {
+    public void updBrokerTopicConfInfo(int brokerId,
+                                       Map<String, TopicInfo> topicInfoMap) {
         if (topicInfoMap == null) {
             return;
         }
         // get removed topic info
-        Set<String> delTopicSet = new HashSet<>();
-        ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
-        if (curTopicSet != null) {
-            for (String topic : curTopicSet) {
-                if (!topicInfoMap.containsKey(topic)) {
-                    delTopicSet.add(topic);
-                }
-            }
-        }
-        rmvBrokerTopicInfo(brokerId, delTopicSet);
+        rmvBrokerTopicInfo(brokerId, topicInfoMap);
         // add or update TopicInfo
         repBrokerTopicInfo(brokerId, topicInfoMap);
-        topicChangeId.set(System.currentTimeMillis());
+        SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
+    }
+
+    /**
+     * update broker's topicInfo configures
+     *
+     * @param brokerId broker id index
+     * @param topicInfoMap broker's topic configure info,
+     *                    if topicInfoMap is null, reserve current configure;
+     *                    if topicInfoMap is empty, clear current configure.
+     * @return if fast sync data
+     */
+    public boolean fastUpdBrokerTopicConfInfo(int brokerId,
+                                              Map<String, TopicInfo> 
topicInfoMap) {
+        if (topicInfoMap == null) {
+            return true;
+        }
+        // get removed topic info
+        rmvBrokerTopicInfo(brokerId, topicInfoMap);
+        // update TopicInfo and judge if fast update
+        Tuple2<Boolean, Boolean> retTuple =
+                updBrokerTopicInfo(brokerId, topicInfoMap);
+        SerialIdUtils.updTimeStampSerialIdValue(this.topicChangeId);
+        return retTuple.getF1();
     }
 
     /**
@@ -152,7 +168,6 @@ public class BrokerTopicInfoView {
      * @param topic need query topic set
      */
     public List<Partition> getAcceptSubParts(String topic, Set<Integer> 
enableSubBrokerIdSet) {
-        Partition tmpPart;
         TopicInfo topicInfo;
         List<Partition> partList = new ArrayList<>();
         if (topic == null) {
@@ -287,25 +302,40 @@ public class BrokerTopicInfoView {
     }
 
     // remove broker special topic info
-    private void rmvBrokerTopicInfo(int brokerId,
-                                    Set<String> delTopicSet) {
-        if (delTopicSet == null || delTopicSet.isEmpty()) {
-            return;
+    private boolean rmvBrokerTopicInfo(int brokerId,
+                                       Map<String, TopicInfo> topicInfoMap) {
+        boolean changed = false;
+        Set<String> delTopicSet = new HashSet<>();
+        ConcurrentHashSet<String> curTopicSet = brokerIdIndexMap.get(brokerId);
+        if (curTopicSet != null) {
+            for (String topic : curTopicSet) {
+                if (!topicInfoMap.containsKey(topic)) {
+                    delTopicSet.add(topic);
+                }
+            }
+        }
+        if (delTopicSet.isEmpty()) {
+            return false;
         }
         ConcurrentHashMap<Integer, TopicInfo> topicInfoView;
         ConcurrentHashSet<String> topicSet = brokerIdIndexMap.get(brokerId);
         if (topicSet == null || topicSet.isEmpty()) {
-            return;
+            return changed;
         }
         for (String topic : delTopicSet) {
-            topicSet.remove(topic);
+            if (topicSet.remove(topic)) {
+                changed = true;
+            }
             topicInfoView = topicConfInfoMap.get(topic);
             if ((topicInfoView == null)
                     || topicInfoView.isEmpty()) {
                 continue;
             }
-            topicInfoView.remove(brokerId);
+            if (topicInfoView.remove(brokerId) != null) {
+                changed = true;
+            }
         }
+        return changed;
     }
 
     // add or update broker special topic info
@@ -344,4 +374,39 @@ public class BrokerTopicInfoView {
         curTopicSet.addAll(topicInfoMap.keySet());
     }
 
+    // update current broker special topic info
+    private Tuple2<Boolean, Boolean> updBrokerTopicInfo(int brokerId,
+                                                        Map<String, TopicInfo> 
topicInfoMap) {
+        boolean isChanged = false;
+        boolean isFastSync = true;
+        if (topicInfoMap == null || topicInfoMap.isEmpty()) {
+            return new Tuple2<>(isChanged, isFastSync);
+        }
+        Tuple2<Boolean, Boolean> retResult;
+        ConcurrentHashMap<Integer, TopicInfo> curTopicInfoView;
+        for (TopicInfo newTopicInfo : topicInfoMap.values()) {
+            if (newTopicInfo == null) {
+                continue;
+            }
+            curTopicInfoView = topicConfInfoMap.get(newTopicInfo.getTopic());
+            if (curTopicInfoView == null) {
+                isFastSync = false;
+                continue;
+            }
+            TopicInfo curTopicInfo = curTopicInfoView.get(brokerId);
+            if (curTopicInfo == null) {
+                isFastSync = false;
+                continue;
+            }
+            retResult = curTopicInfo.updAndJudgeTopicInfo(newTopicInfo);
+            if (retResult.getF0() && !isChanged) {
+                isChanged = true;
+            }
+            if (retResult.getF1() && isFastSync) {
+                isFastSync = false;
+            }
+        }
+        return new Tuple2<>(isChanged, isFastSync);
+    }
+
 }
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index dbd068e..379c795 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.TErrCodeConstants;
 import org.apache.tubemq.corebase.cluster.BrokerInfo;
@@ -40,6 +39,7 @@ import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
 import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
 import org.apache.tubemq.server.common.statusdef.ManageStatus;
 import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.common.utils.SerialIdUtils;
 import org.apache.tubemq.server.master.MasterConfig;
 import org.apache.tubemq.server.master.TMaster;
 import org.apache.tubemq.server.master.metamanage.MetaDataManager;
@@ -56,21 +56,23 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
     private final MetaDataManager metaDataManager;
     private final HeartbeatManager heartbeatManager;
     // broker string info
-    private AtomicLong brokerInfoCheckSum = new 
AtomicLong(System.currentTimeMillis());
+    private final AtomicLong brokerInfoCheckSum =
+            new AtomicLong(System.currentTimeMillis());
     private long lastBrokerUpdatedTime = System.currentTimeMillis();
     private final ConcurrentHashMap<Integer, String> brokersMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer, String> brokersTLSMap =
             new ConcurrentHashMap<>();
-
     // broker sync FSM
-    private AtomicInteger brokerTotalCount = new AtomicInteger(0);
-    private ConcurrentHashMap<Integer/* brokerId */, BrokerRunStatusInfo> 
brokerRunSyncManageMap =
+    private final AtomicInteger brokerTotalCount =
+            new AtomicInteger(0);
+    // brokerId -- broker run status info map
+    private final ConcurrentHashMap<Integer, BrokerRunStatusInfo> 
brokerRunSyncManageMap =
             new ConcurrentHashMap<>();
     // broker abnormal holder
     private final BrokerAbnHolder brokerAbnHolder;
     // broker topic configure for consumer and producer
-    private BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();
+    private final BrokerPSInfoHolder brokerPubSubInfo = new 
BrokerPSInfoHolder();
 
 
     public DefBrokerRunManager(TMaster tMaster) {
@@ -134,7 +136,7 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
                     && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) {
                 this.brokersTLSMap.put(entity.getBrokerId(), 
entity.getSimpleTLSBrokerInfo());
             }
-            this.brokerInfoCheckSum.set(System.currentTimeMillis());
+            SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
         }
     }
 
@@ -146,7 +148,7 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
         String brokerReg = this.brokersMap.remove(brokerId);
         String brokerTLSReg = this.brokersTLSMap.remove(brokerId);
         if (brokerReg != null || brokerTLSReg != null) {
-            this.brokerInfoCheckSum.set(System.currentTimeMillis());
+            SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
         }
     }
 
@@ -374,8 +376,10 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
         builder.setStopWrite(autoFbdTuple.getF0());
         builder.setStopRead(autoFbdTuple.getF1());
         if (retTuple.getF2() == null) {
+            builder.setNeedReportData(false);
             builder.setTakeConfInfo(false);
         } else {
+            builder.setNeedReportData(true);
             builder.setTakeConfInfo(true);
             builder.setBrokerDefaultConfInfo(retTuple.getF2());
             builder.addAllBrokerTopicSetConfInfo(retTuple.getF3());
@@ -453,7 +457,7 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
         }
         brokerTotalCount.decrementAndGet();
         brokerAbnHolder.removeBroker(brokerId);
-        brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId, isTimeout);
+        brokerPubSubInfo.rmvBrokerAllPushedInfo(brokerId);
         logger.info(sBuffer.append("[Broker Release] 
brokerId=").append(brokerId)
                 .append(", isTimeout=").append(isTimeout)
                 .append(", release success!").toString());
@@ -461,17 +465,15 @@ public class DefBrokerRunManager implements 
BrokerRunManager {
     }
 
     @Override
-    public void updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
-                                     Map<String, TopicInfo> topicInfoMap) {
+    public boolean updBrokerCsmConfInfo(int brokerId, ManageStatus mngStatus,
+                                        Map<String, TopicInfo> topicInfoMap) {
         brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
-        brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId, topicInfoMap);
-
+        return brokerPubSubInfo.updBrokerSubTopicConfInfo(brokerId, 
topicInfoMap);
     }
 
     @Override
     public void updBrokerPrdConfInfo(int brokerId, ManageStatus mngStatus,
                                      Map<String, TopicInfo> topicInfoMap) {
-        brokerPubSubInfo.updBrokerMangeStatus(brokerId, mngStatus);
         brokerPubSubInfo.updBrokerPubTopicConfInfo(brokerId, topicInfoMap);
     }
 
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index cc8145d..97eec4c 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -509,6 +509,12 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
             enableAuthCtrl = false;
             TopicCtrlEntity ctrlEntity =
                     metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+            if (ctrlEntity == null) {
+                continue;
+            }
+            if (totalCnt++ > 0) {
+                sBuffer.append(",");
+            }
             maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
             if (ctrlEntity != null) {
                 maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
@@ -645,6 +651,12 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
             isAcceptSubscribe = false;
             TopicCtrlEntity ctrlEntity =
                     metaDataManager.getTopicCtrlByTopicName(entry.getKey());
+            if (ctrlEntity == null) {
+                continue;
+            }
+            if (totalCnt++ > 0) {
+                sBuffer.append(",");
+            }
             ctrlEntity.toWebJsonStr(sBuffer, true, false);
             sBuffer.append(",\"deployInfo\":[");
             int brokerCount = 0;
@@ -657,6 +669,7 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
                 sBuffer.append(",\"runInfo\":{");
                 BrokerConfEntity brokerConfEntity =
                         
metaDataManager.getBrokerConfByBrokerId(entity.getBrokerId());
+
                 String strManageStatus = "-";
                 if (brokerConfEntity != null) {
                     manageStatus = brokerConfEntity.getManageStatus();
diff --git 
a/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
 
b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
index 2d13258..71d76d4 100644
--- 
a/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
+++ 
b/tubemq-server/src/test/java/org/apache/tubemq/server/common/WebParameterUtilsTest.java
@@ -254,16 +254,45 @@ public class WebParameterUtilsTest {
     public void getTopicPropInfoTest() {
         boolean retValue;
         TopicPropGroup retEntry;
-        TopicPropGroup defOpEntity = new TopicPropGroup();
+        TopicPropGroup defOpEntity;
         StringBuilder sBuffer = new StringBuilder(512);
         ProcessResult result = new ProcessResult();
         Map<String, String> paramCntrMap = new HashMap<>();
         // case 1
+        paramCntrMap.put(WebFieldDef.NUMTOPICSTORES.name, "1");
+        paramCntrMap.put(WebFieldDef.NUMPARTITIONS.name, "2");
+        paramCntrMap.put(WebFieldDef.UNFLUSHTHRESHOLD.name, "3");
+        paramCntrMap.put(WebFieldDef.UNFLUSHINTERVAL.name, "4");
+        paramCntrMap.put(WebFieldDef.UNFLUSHDATAHOLD.name, "5");
+        paramCntrMap.put(WebFieldDef.MCACHESIZEINMB.name, "2");
+        paramCntrMap.put(WebFieldDef.UNFMCACHECNTINK.name, "7");
+        paramCntrMap.put(WebFieldDef.UNFMCACHEINTERVAL.name, "4000");
+        paramCntrMap.put(WebFieldDef.ACCEPTPUBLISH.name, "true");
+        paramCntrMap.put(WebFieldDef.ACCEPTSUBSCRIBE.name, "false");
+        paramCntrMap.put(WebFieldDef.DATASTORETYPE.name, "9");
+        paramCntrMap.put(WebFieldDef.DATAPATH.name, "test");
+        paramCntrMap.put(WebFieldDef.DELETEPOLICY.name, "delete,2h");
         retValue = WebParameterUtils.getTopicPropInfo(paramCntrMap,
                 null, sBuffer, result);
         Assert.assertTrue(retValue);
         Assert.assertTrue(result.isSuccess());
         retEntry = (TopicPropGroup) result.getRetData();
+        Assert.assertEquals(retEntry.getNumTopicStores(),
+                
Integer.parseInt(paramCntrMap.get(WebFieldDef.NUMTOPICSTORES.name)));
+        // case 2
+        paramCntrMap.clear();
+        defOpEntity = new TopicPropGroup();
+        defOpEntity.fillDefaultValue();
+        paramCntrMap.put(WebFieldDef.ACCEPTPUBLISH.name, "0");
+        paramCntrMap.put(WebFieldDef.NUMTOPICSTORES.name, "9");
+        paramCntrMap.put(WebFieldDef.UNFMCACHECNTINK.name, "100");
+        retValue = WebParameterUtils.getTopicPropInfo(paramCntrMap,
+                defOpEntity, sBuffer, result);
+        Assert.assertTrue(retValue);
+        Assert.assertTrue(result.isSuccess());
+        retEntry = (TopicPropGroup) result.getRetData();
+
+
     }
 
 }

Reply via email to