This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fb9012b  [INLONG-3268][TubeMQ] Fix some bugs when metadata is saved to 
ZooKeeper (#3270)
fb9012b is described below

commit fb9012b97e8712cafb34c70e5b4b768e9cc5f4e9
Author: gosonzhang <[email protected]>
AuthorDate: Mon Mar 21 19:38:54 2022 +0800

    [INLONG-3268][TubeMQ] Fix some bugs when metadata is saved to ZooKeeper 
(#3270)
---
 .../server/broker/offset/OffsetRecordService.java  | 18 +++--
 .../master/metamanage/DefaultMetaDataService.java  | 36 ++++++----
 .../metastore/dao/entity/BrokerConfEntity.java     | 60 ++++++++++++----
 .../metastore/dao/entity/TopicDeployEntity.java    |  9 +++
 .../metastore/impl/AbsMetaConfigMapperImpl.java    | 26 +++++--
 .../metastore/impl/zkimpl/TZKNodeKeys.java         |  3 +-
 .../impl/zkimpl/ZKMetaConfigMapperImpl.java        | 25 ++++---
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  6 +-
 .../master/web/handler/WebMasterInfoHandler.java   | 19 ++---
 .../master/web/handler/WebTopicDeployHandler.java  | 80 +++++++++++++++++++++-
 .../tubemq/server/tools/cli/CliMetaDataBRU.java    |  4 +-
 11 files changed, 226 insertions(+), 60 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 751bbe9..8d65ba8 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -20,8 +20,11 @@ package org.apache.inlong.tubemq.server.broker.offset;
 import java.util.Map;
 import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
 import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
+import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.inlong.tubemq.server.broker.TubeBroker;
+import org.apache.inlong.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,8 +80,16 @@ public class OffsetRecordService extends 
AbstractDaemonService {
     }
 
     private void storeRecord2LocalTopic(StringBuilder strBuff) {
-        // get query timestamp
-        long recordStamp = System.currentTimeMillis();
+        // check node writable status
+        if (ServiceStatusHolder.isWriteServiceStop()) {
+            return;
+        }
+        // check topic writable status
+        TopicMetadata topicMetadata = storeManager.getMetadataManager()
+                .getTopicMetadata(TServerConstants.OFFSET_HISTORY_NAME);
+        if (!topicMetadata.isAcceptPublish()) {
+            return;
+        }
         // get group offset information
         Map<String, OffsetRecordInfo> groupOffsetMap =
                 offsetManager.getOnlineGroupOffsetInfo();
@@ -89,7 +100,6 @@ public class OffsetRecordService extends 
AbstractDaemonService {
         storeManager.getTopicPublishInfos(groupOffsetMap);
         // store group offset records to offset storage topic
         broker.getBrokerServiceServer().appendGroupOffsetInfo(groupOffsetMap,
-                brokerAddrId, recordStamp, 10, 3, strBuff);
+                brokerAddrId, System.currentTimeMillis(), 10, 3, strBuff);
     }
-
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 3d06233..4347f77 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -271,8 +271,7 @@ public class DefaultMetaDataService implements 
MetaDataService {
                 continue;
             }
             String allowedCondStr = ctrlEntity.getFilterCondStr();
-            if (allowedCondStr.length() == 2
-                    && 
allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
+            if (allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) 
{
                 
result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
                         strBuff.append("[Restricted Group] 
").append(consumerId)
                                 .append(" : ").append(groupName)
@@ -724,59 +723,70 @@ public class DefaultMetaDataService implements 
MetaDataService {
             }
             strBuff.append(topicEntity.getTopicName());
             TopicPropGroup topicProps = topicEntity.getTopicProps();
-            if (topicProps.getNumPartitions() == 
defTopicProps.getNumPartitions()) {
+            if (topicProps.getNumPartitions() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getNumPartitions() == 
defTopicProps.getNumPartitions()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getNumPartitions());
             }
-            if (topicProps.isAcceptPublish() == 
defTopicProps.isAcceptPublish()) {
+            if (topicProps.getAcceptPublish() == null
+                    || topicProps.isAcceptPublish() == 
defTopicProps.isAcceptPublish()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.isAcceptPublish());
             }
-            if (topicProps.isAcceptSubscribe() == 
defTopicProps.isAcceptSubscribe()) {
+            if (topicProps.getAcceptSubscribe() == null
+                    || topicProps.isAcceptSubscribe() == 
defTopicProps.isAcceptSubscribe()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.isAcceptSubscribe());
             }
-            if (topicProps.getUnflushThreshold() == 
defTopicProps.getUnflushThreshold()) {
+            if (topicProps.getUnflushThreshold() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getUnflushThreshold() == 
defTopicProps.getUnflushThreshold()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushThreshold());
             }
-            if (topicProps.getUnflushInterval() == 
defTopicProps.getUnflushInterval()) {
+            if (topicProps.getUnflushInterval() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getUnflushInterval() == 
defTopicProps.getUnflushInterval()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushInterval());
             }
             strBuff.append(TokenConstants.ATTR_SEP).append(" ");
-            if 
(topicProps.getDeletePolicy().equals(defTopicProps.getDeletePolicy())) {
+            if (TStringUtils.isEmpty(topicProps.getDeletePolicy())
+                    || 
topicProps.getDeletePolicy().equals(defTopicProps.getDeletePolicy())) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getDeletePolicy());
             }
-            if (topicProps.getNumTopicStores() == 
defTopicProps.getNumTopicStores()) {
+            if (topicProps.getNumTopicStores() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getNumTopicStores() == 
defTopicProps.getNumTopicStores()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getNumTopicStores());
             }
             
strBuff.append(TokenConstants.ATTR_SEP).append(topicEntity.getTopicStatusId());
-            if (topicProps.getUnflushDataHold() == 
defTopicProps.getUnflushDataHold()) {
+            if (topicProps.getUnflushDataHold() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getUnflushDataHold() == 
defTopicProps.getUnflushDataHold()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushDataHold());
             }
-            if (topicProps.getMemCacheMsgSizeInMB() == 
defTopicProps.getMemCacheMsgSizeInMB()) {
+            if (topicProps.getMemCacheMsgSizeInMB() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getMemCacheMsgSizeInMB() == 
defTopicProps.getMemCacheMsgSizeInMB()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheMsgSizeInMB());
             }
-            if (topicProps.getMemCacheMsgCntInK() == 
defTopicProps.getMemCacheMsgCntInK()) {
+            if (topicProps.getMemCacheMsgCntInK() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getMemCacheMsgCntInK() == 
defTopicProps.getMemCacheMsgCntInK()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheMsgCntInK());
             }
-            if (topicProps.getMemCacheFlushIntvl() == 
defTopicProps.getMemCacheFlushIntvl()) {
+            if (topicProps.getMemCacheFlushIntvl() == 
TBaseConstants.META_VALUE_UNDEFINED
+                    || topicProps.getMemCacheFlushIntvl() == 
defTopicProps.getMemCacheFlushIntvl()) {
                 strBuff.append(TokenConstants.ATTR_SEP).append(" ");
             } else {
                 
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheFlushIntvl());
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 056ab2b..55691be 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -60,6 +60,15 @@ public class BrokerConfEntity extends BaseEntity implements 
Cloneable {
         this.brokerIp = brokerIp;
     }
 
+    public BrokerConfEntity(BaseEntity opEntity, int brokerId,
+                            String brokerIp, ClusterSettingEntity defSetting) {
+        super(opEntity);
+        this.brokerWebPort = defSetting.getBrokerWebPort();
+        this.topicProps.updModifyInfo(defSetting.getClsDefTopicProps());
+        setBrokerIpAndAllPort(brokerId, brokerIp,
+                defSetting.getBrokerPort(), defSetting.getBrokerTLSPort());
+    }
+
     /**
      * Initial Broker Configure entity by BdbBrokerConfEntity
      *
@@ -383,21 +392,44 @@ public class BrokerConfEntity extends BaseEntity 
implements Cloneable {
     /**
      * Get broker config string
      *
-     * @param strBuff  the string buffer
+     * @param defSetting  the default setting
+     * @param strBuff     the string buffer
      */
-    public void getBrokerDefaultConfInfo(StringBuilder strBuff) {
-        
strBuff.append(topicProps.getNumPartitions()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.isAcceptPublish()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.isAcceptSubscribe()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getUnflushThreshold()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getUnflushInterval()).append(TokenConstants.ATTR_SEP)
-                .append(" ").append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getDeletePolicy()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getNumTopicStores()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getUnflushDataHold()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getMemCacheMsgSizeInMB()).append(TokenConstants.ATTR_SEP)
-                
.append(topicProps.getMemCacheMsgCntInK()).append(TokenConstants.ATTR_SEP)
-                .append(topicProps.getMemCacheFlushIntvl());
+    public void getBrokerDefaultConfInfo(ClusterSettingEntity defSetting,
+                                         StringBuilder strBuff) {
+        TopicPropGroup defTopicProps = defSetting.getClsDefTopicProps();
+        strBuff.append((topicProps.getNumPartitions() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                ? defTopicProps.getNumPartitions() : 
topicProps.getNumPartitions())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getAcceptPublish() == null)
+                        ? defTopicProps.isAcceptPublish() : 
topicProps.isAcceptPublish())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getAcceptSubscribe() == null)
+                        ? defTopicProps.isAcceptSubscribe() : 
topicProps.isAcceptSubscribe())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getUnflushThreshold() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getUnflushThreshold() : 
topicProps.getUnflushThreshold())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getUnflushInterval() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getUnflushInterval() : 
topicProps.getUnflushInterval())
+                .append(TokenConstants.ATTR_SEP).append(" 
").append(TokenConstants.ATTR_SEP)
+                .append((TStringUtils.isEmpty(topicProps.getDeletePolicy()))
+                        ? defTopicProps.getDeletePolicy() : 
topicProps.getDeletePolicy())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getNumTopicStores() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getNumTopicStores() : 
topicProps.getNumTopicStores())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getUnflushDataHold() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getUnflushDataHold() : 
topicProps.getUnflushDataHold())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getMemCacheMsgSizeInMB() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getMemCacheMsgSizeInMB() : 
topicProps.getMemCacheMsgSizeInMB())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getMemCacheMsgCntInK() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getMemCacheMsgCntInK() : 
topicProps.getMemCacheMsgCntInK())
+                .append(TokenConstants.ATTR_SEP)
+                .append((topicProps.getMemCacheFlushIntvl() == 
TBaseConstants.META_VALUE_UNDEFINED)
+                        ? defTopicProps.getMemCacheFlushIntvl() : 
topicProps.getMemCacheFlushIntvl());
     }
 
     /**
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 6df2cfe..c1b6a61 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -51,6 +51,15 @@ public class TopicDeployEntity extends BaseEntity implements 
Cloneable {
         this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, 
topicName);
     }
 
+    public TopicDeployEntity(BaseEntity opInfoEntity, int brokerId,
+                             String topicName, TopicPropGroup topicProps) {
+        super(opInfoEntity);
+        this.brokerId = brokerId;
+        this.topicName = topicName;
+        this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId, 
topicName);
+        this.topicProps.updModifyInfo(topicProps);
+    }
+
     /**
      * Constructor by BdbTopicConfEntity
      *
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 84297fc..077a1ce 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -204,7 +204,13 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
             lid = metaRowLock.getLock(null,
                     
StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
             if (isAddOp) {
-                brokerConfigMapper.addBrokerConf(entity, strBuff, result);
+                newEntity = new BrokerConfEntity(entity, entity.getBrokerId(),
+                        entity.getBrokerIp(), getClusterDefSetting(false));
+                newEntity.updModifyInfo(entity.getDataVerId(), 
entity.getBrokerPort(),
+                        entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
+                        entity.getRegionId(), entity.getGroupId(), 
entity.getManageStatus(),
+                        entity.getTopicProps());
+                brokerConfigMapper.addBrokerConf(newEntity, strBuff, result);
             } else {
                 printPrefix = "[updBrokerConf], ";
                 curEntity = 
brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
@@ -446,7 +452,9 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
         int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
         TopicCtrlEntity topicCtrlEntity = 
topicCtrlMapper.getTopicCtrlConf(topicName);
         if (topicCtrlEntity != null) {
-            maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+            if (topicCtrlEntity.getMaxMsgSizeInMB() != 
TBaseConstants.META_VALUE_UNDEFINED) {
+                maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+            }
         }
         return maxMsgSizeInMB;
     }
@@ -466,7 +474,7 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
      * Add if absent topic control configure info
      *
      * @param opEntity  the operation info
-     * @param topicName the topic name will be add
+     * @param topicName the topic name will be added
      * @param strBuff   the print info string buffer
      * @param result    the process result return
      * @return true if success otherwise false
@@ -562,7 +570,7 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
     @Override
     public void addSystemTopicDeploy(int brokerId, int brokerPort,
                                      String brokerIp, StringBuilder strBuff) {
-        BaseEntity opEntity = new BaseEntity("system-self", new Date());
+        BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
         TopicPropGroup topicPropInfo = new TopicPropGroup();
         
topicPropInfo.setNumTopicStores(TServerConstants.OFFSET_HISTORY_NUMSTORES);
         
topicPropInfo.setNumPartitions(TServerConstants.OFFSET_HISTORY_NUMPARTS);
@@ -621,7 +629,15 @@ public abstract class AbsMetaConfigMapperImpl implements 
MetaConfigMapper {
                         return result.isSuccess();
                     }
                     // add record
-                    topicDeployMapper.addTopicDeployConf(entity, strBuff, 
result);
+                    TopicPropGroup newProps =
+                            
getClusterDefSetting(false).getClsDefTopicProps().clone();
+                    newProps.updModifyInfo(brokerEntity.getTopicProps());
+                    newEntity = new TopicDeployEntity(entity,
+                            entity.getBrokerId(), entity.getTopicName(), 
newProps);
+                    newEntity.updModifyInfo(entity.getDataVerId(), 
entity.getTopicId(),
+                            brokerEntity.getBrokerPort(), 
brokerEntity.getBrokerIp(),
+                            entity.getTopicStatus(), entity.getTopicProps());
+                    topicDeployMapper.addTopicDeployConf(newEntity, strBuff, 
result);
                 } else {
                     printPrefix = "[updTopicDeployConf], ";
                     if (curEntity == null) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
index a5b21d7..54a09e2 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
@@ -18,8 +18,9 @@
 package 
org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.zkimpl;
 
 public class TZKNodeKeys {
-    public static final String ZK_BRANCH_HA = "masterHA";
     public static final String ZK_BRANCH_META_DATA = "metaData";
+    public static final String ZK_BRANCH_MASTER_HA = "masterHA";
+    public static final String ZK_LEAF_MASTER_HA_NODEID = "nodeId";
     public static final String ZK_LEAF_CLUSTER_CONFIG = "clusterConfig";
     public static final String ZK_LEAF_BROKER_CONFIG = "brokerConfig";
     public static final String ZK_LEAF_CONSUME_CTRL_CONFIG = 
"consumeCtrlConfig";
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
index 86529cd..79cf3aa 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
@@ -53,7 +53,10 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
     // the meta data path in ZooKeeper
     private final String metaZkRoot;
     // the ha path in ZooKeeper
+    private final String haParentPath;
+    // the ha path in ZooKeeper
     private final String haNodesPath;
+
     // whether is the first start
     private volatile boolean isFirstChk = true;
     // the ZooKeeper watcher
@@ -88,8 +91,11 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
         this.metaZkRoot = 
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
                 .append(TZKNodeKeys.ZK_BRANCH_META_DATA).toString();
         strBuff.delete(0, strBuff.length());
-        this.haNodesPath = 
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
-                
.append(TZKNodeKeys.ZK_BRANCH_HA).append("/nodeIds").toString();
+        this.haParentPath = 
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
+                .append(TZKNodeKeys.ZK_BRANCH_MASTER_HA).toString();
+        strBuff.delete(0, strBuff.length());
+        this.haNodesPath = 
strBuff.append(this.haParentPath).append(TokenConstants.SLASH)
+                
.append(TZKNodeKeys.ZK_LEAF_MASTER_HA_NODEID).append(TokenConstants.ATTR_SEP).toString();
         strBuff.delete(0, strBuff.length());
         try {
             this.zkWatcher = new 
ZooKeeperWatcher(masterConfig.getZkMetaConfig());
@@ -173,7 +179,7 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
             return null;
         }
         if (!clusterNodeMap.isEmpty()) {
-            return clusterNodeMap.get(queryResult.getF1()).split(":")[0];
+            return 
clusterNodeMap.get(queryResult.getF1()).split(TokenConstants.ATTR_SEP)[0];
         }
         return null;
     }
@@ -208,8 +214,9 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
         List<ClusterNodeVO> clusterNodeVOs = new ArrayList<>();
         for (Map.Entry<Long, String> entry : clusterNodeMap.entrySet()) {
             nodeAdd = entry.getValue();
-            clusterNodeVOs.add(new ClusterNodeVO(nodeAdd, 
nodeAdd.split(":")[0],
-                    Integer.parseInt(nodeAdd.split(":")[1]),
+            clusterNodeVOs.add(new ClusterNodeVO(nodeAdd,
+                    nodeAdd.split(TokenConstants.ATTR_SEP)[0],
+                    
Integer.parseInt(nodeAdd.split(TokenConstants.ATTR_SEP)[1]),
                     entry.getKey().equals(queryResult.getF1()) ? "Master" : 
"Slave", 0));
         }
         if (clusterNodeMap.isEmpty()) {
@@ -245,9 +252,9 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
             try {
                 if (isFirstChk) {
                     // check whether the HA directory already exists on ZK
-                    if (ZKUtil.checkExists(zkWatcher, haNodesPath) == -1) {
+                    if (ZKUtil.checkExists(zkWatcher, haParentPath) == -1) {
                         // create path if not exists
-                        ZKUtil.createWithParents(zkWatcher, haNodesPath);
+                        ZKUtil.createWithParents(zkWatcher, haParentPath);
                     } else {
                         isFirstChk = false;
                     }
@@ -320,14 +327,14 @@ public class ZKMetaConfigMapperImpl extends 
AbsMetaConfigMapperImpl {
         boolean foundSelf = false;
         long minNodeId = Long.MAX_VALUE;
         List<ZKUtil.NodeAndData> childNodes =
-                ZKUtil.getChildDataAndWatchForNewChildren(zkWatcher, 
haNodesPath);
+                ZKUtil.getChildDataAndWatchForNewChildren(zkWatcher, 
haParentPath);
         for (ZKUtil.NodeAndData child : childNodes) {
             // select the first registered node as Master
             if (child == null) {
                 continue;
             }
             nodeAdd = new String(child.getData());
-            materNodeId = Long.parseLong(child.getNode().split(":")[1]);
+            materNodeId = 
Long.parseLong(child.getNode().split(TokenConstants.ATTR_SEP)[1]);
             if (minNodeId > materNodeId) {
                 minNodeId = materNodeId;
             }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index de1dafd..72729fe 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -227,7 +227,8 @@ public class DefBrokerRunManager implements 
BrokerRunManager, ConfigObserver {
             sBuffer.delete(0, sBuffer.length());
             return result.isSuccess();
         }
-        brokerEntry.getBrokerDefaultConfInfo(sBuffer);
+        brokerEntry.getBrokerDefaultConfInfo(
+                metaDataService.getClusterDefSetting(false), sBuffer);
         String brokerConfInfo = sBuffer.toString();
         sBuffer.delete(0, sBuffer.length());
         Map<String, String> topicConfInfoMap =
@@ -330,7 +331,8 @@ public class DefBrokerRunManager implements 
BrokerRunManager, ConfigObserver {
         BrokerConfEntity brokerConfEntity =
                 metaDataService.getBrokerConfByBrokerId(brokerId);
         if (brokerConfEntity != null) {
-            brokerConfEntity.getBrokerDefaultConfInfo(sBuffer);
+            brokerConfEntity.getBrokerDefaultConfInfo(
+                    metaDataService.getClusterDefSetting(false), sBuffer);
             brokerConfInfo = sBuffer.toString();
             sBuffer.delete(0, sBuffer.length());
             manageStatus = brokerConfEntity.getManageStatus();
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index d7b3f94..8be5baf 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -288,20 +288,21 @@ public class WebMasterInfoHandler extends 
AbstractWebHandler {
             isAcceptPublish = false;
             isAcceptSubscribe = false;
             for (TopicDeployEntity entity : entry.getValue()) {
+                BrokerConfEntity brokerConfEntity =
+                        
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
+                if (brokerConfEntity == null) {
+                    continue;
+                }
                 brokerCount++;
+                Tuple2<Boolean, Boolean> pubSubStatus =
+                        WebParameterUtils.getPubSubStatusByManageStatus(
+                                brokerConfEntity.getManageStatus().getCode());
+                isAcceptPublish = pubSubStatus.getF0();
+                isAcceptSubscribe = pubSubStatus.getF1();
                 TopicPropGroup topicProps = entity.getTopicProps();
                 totalCfgTopicStoreCount += topicProps.getNumTopicStores();
                 totalCfgNumPartCount +=
                         topicProps.getNumPartitions() * 
topicProps.getNumTopicStores();
-                BrokerConfEntity brokerConfEntity =
-                        
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
-                if (brokerConfEntity != null) {
-                    Tuple2<Boolean, Boolean> pubSubStatus =
-                            WebParameterUtils.getPubSubStatusByManageStatus(
-                                    
brokerConfEntity.getManageStatus().getCode());
-                    isAcceptPublish = pubSubStatus.getF0();
-                    isAcceptSubscribe = pubSubStatus.getF1();
-                }
                 TopicInfo topicInfo =
                         
brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(), 
entity.getTopicName());
                 if (topicInfo != null) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index a37fe43..ef77e2a 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -61,6 +61,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler 
{
         // register query method
         registerQueryWebMethod("admin_query_topic_deploy_info",
                 "adminNewQueryTopicCfgAndRunInfo");
+        registerQueryWebMethod("admin_query_topic_deploy_configure",
+                "innQueryTopicDeployConfInfo");
         registerQueryWebMethod("admin_query_broker_topic_config_info",
                 "adminQueryBrokerTopicCfgAndRunInfo");
         registerQueryWebMethod("admin_query_topicName",
@@ -438,6 +440,80 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
      * @param result    process result
      * @return    process result
      */
+    private StringBuilder innQueryTopicDeployConfInfo(HttpServletRequest req,
+                                                      StringBuilder sBuffer,
+                                                      ProcessResult result) {
+        TopicDeployEntity qryEntity = new TopicDeployEntity();
+        // get queried operation info, for createUser, modifyUser, 
dataVersionId
+        if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer, 
result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        // check and get topicName field
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        final Set<String> topicNameSet = (Set<String>) result.getRetData();
+        // check and get brokerId field
+        if (!WebParameterUtils.getIntParamValue(req,
+                WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        final Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
+        // get brokerPort field
+        if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+                false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer, 
result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        final int brokerPort = (int) result.getRetData();
+        // get and valid topicProps info
+        if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+        // get and valid TopicStatusId info
+        if (!WebParameterUtils.getTopicStatusParamValue(req,
+                false, TopicStatus.STATUS_TOPIC_UNDEFINED, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return sBuffer;
+        }
+        TopicStatus topicStatus = (TopicStatus) result.getRetData();
+        qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+                TBaseConstants.META_VALUE_UNDEFINED,
+                brokerPort, null, topicStatus, topicProps);
+        Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+                defMetaDataService.getTopicDeployInfoMap(topicNameSet, 
brokerIdSet, qryEntity);
+        // build query result
+        int totalCnt = 0;
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        for (Map.Entry<String, List<TopicDeployEntity>> entry : 
topicDeployInfoMap.entrySet()) {
+            if (entry.getValue() == null || entry.getValue().isEmpty()) {
+                continue;
+            }
+            for (TopicDeployEntity entity : entry.getValue()) {
+                if (totalCnt++ > 0) {
+                    sBuffer.append(",");
+                }
+                entity.toWebJsonStr(sBuffer, true, true);
+            }
+        }
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+        return sBuffer;
+    }
+
+    /**
+     * Query topic info
+     *
+     * @param req       Http Servlet Request
+     * @param sBuffer   string buffer
+     * @param result    process result
+     * @return    process result
+     */
     private StringBuilder innQueryTopicConfAndRunInfo(HttpServletRequest req,
                                                       StringBuilder sBuffer,
                                                       ProcessResult result,
@@ -536,7 +612,9 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
                 sBuffer.append(",");
             }
             maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
-            maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
+            if (ctrlEntity.getMaxMsgSizeInMB() != 
TBaseConstants.META_VALUE_UNDEFINED) {
+                maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
+            }
             enableAuthCtrl = ctrlEntity.getAuthCtrlStatus().isEnable();
             sBuffer.append("{\"topicName\":\"").append(entry.getKey())
                     .append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
index 5eaf707..e903a9f 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
@@ -972,9 +972,9 @@ public class CliMetaDataBRU extends CliAbstractBase {
      * @return         the query result, null if query failure
      */
     private Map<String, TopicDeployEntity> getTopicDeployInfos(StringBuilder 
strBuff) {
-        // 
http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_deploy_info
+        // 
http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_deploy_configure
         JsonObject jsonRes = qryDataFromMaster(
-                "admin_query_topic_deploy_info", new HashMap<>(), strBuff);
+                "admin_query_topic_deploy_configure", new HashMap<>(), 
strBuff);
         // check return result
         if (!jsonRes.get("result").getAsBoolean()) {
             logger.info(strBuff.append("Query topic deploy configurations info 
failure:")

Reply via email to