This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fb9012b [INLONG-3268][TubeMQ] Fix some bugs when metadata is saved to
ZooKeeper (#3270)
fb9012b is described below
commit fb9012b97e8712cafb34c70e5b4b768e9cc5f4e9
Author: gosonzhang <[email protected]>
AuthorDate: Mon Mar 21 19:38:54 2022 +0800
[INLONG-3268][TubeMQ] Fix some bugs when metadata is saved to ZooKeeper
(#3270)
---
.../server/broker/offset/OffsetRecordService.java | 18 +++--
.../master/metamanage/DefaultMetaDataService.java | 36 ++++++----
.../metastore/dao/entity/BrokerConfEntity.java | 60 ++++++++++++----
.../metastore/dao/entity/TopicDeployEntity.java | 9 +++
.../metastore/impl/AbsMetaConfigMapperImpl.java | 26 +++++--
.../metastore/impl/zkimpl/TZKNodeKeys.java | 3 +-
.../impl/zkimpl/ZKMetaConfigMapperImpl.java | 25 ++++---
.../nodemanage/nodebroker/DefBrokerRunManager.java | 6 +-
.../master/web/handler/WebMasterInfoHandler.java | 19 ++---
.../master/web/handler/WebTopicDeployHandler.java | 80 +++++++++++++++++++++-
.../tubemq/server/tools/cli/CliMetaDataBRU.java | 4 +-
11 files changed, 226 insertions(+), 60 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 751bbe9..8d65ba8 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -20,8 +20,11 @@ package org.apache.inlong.tubemq.server.broker.offset;
import java.util.Map;
import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
+import org.apache.inlong.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.inlong.tubemq.server.broker.TubeBroker;
+import org.apache.inlong.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,8 +80,16 @@ public class OffsetRecordService extends
AbstractDaemonService {
}
private void storeRecord2LocalTopic(StringBuilder strBuff) {
- // get query timestamp
- long recordStamp = System.currentTimeMillis();
+ // check node writable status
+ if (ServiceStatusHolder.isWriteServiceStop()) {
+ return;
+ }
+ // check topic writable status
+ TopicMetadata topicMetadata = storeManager.getMetadataManager()
+ .getTopicMetadata(TServerConstants.OFFSET_HISTORY_NAME);
+ if (!topicMetadata.isAcceptPublish()) {
+ return;
+ }
// get group offset information
Map<String, OffsetRecordInfo> groupOffsetMap =
offsetManager.getOnlineGroupOffsetInfo();
@@ -89,7 +100,6 @@ public class OffsetRecordService extends
AbstractDaemonService {
storeManager.getTopicPublishInfos(groupOffsetMap);
// store group offset records to offset storage topic
broker.getBrokerServiceServer().appendGroupOffsetInfo(groupOffsetMap,
- brokerAddrId, recordStamp, 10, 3, strBuff);
+ brokerAddrId, System.currentTimeMillis(), 10, 3, strBuff);
}
-
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index 3d06233..4347f77 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -271,8 +271,7 @@ public class DefaultMetaDataService implements
MetaDataService {
continue;
}
String allowedCondStr = ctrlEntity.getFilterCondStr();
- if (allowedCondStr.length() == 2
- &&
allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
+ if (allowedCondStr.equals(TServerConstants.BLANK_FILTER_ITEM_STR))
{
result.setFailResult(TErrCodeConstants.CONSUME_CONTENT_FORBIDDEN,
strBuff.append("[Restricted Group]
").append(consumerId)
.append(" : ").append(groupName)
@@ -724,59 +723,70 @@ public class DefaultMetaDataService implements
MetaDataService {
}
strBuff.append(topicEntity.getTopicName());
TopicPropGroup topicProps = topicEntity.getTopicProps();
- if (topicProps.getNumPartitions() ==
defTopicProps.getNumPartitions()) {
+ if (topicProps.getNumPartitions() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getNumPartitions() ==
defTopicProps.getNumPartitions()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getNumPartitions());
}
- if (topicProps.isAcceptPublish() ==
defTopicProps.isAcceptPublish()) {
+ if (topicProps.getAcceptPublish() == null
+ || topicProps.isAcceptPublish() ==
defTopicProps.isAcceptPublish()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.isAcceptPublish());
}
- if (topicProps.isAcceptSubscribe() ==
defTopicProps.isAcceptSubscribe()) {
+ if (topicProps.getAcceptSubscribe() == null
+ || topicProps.isAcceptSubscribe() ==
defTopicProps.isAcceptSubscribe()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.isAcceptSubscribe());
}
- if (topicProps.getUnflushThreshold() ==
defTopicProps.getUnflushThreshold()) {
+ if (topicProps.getUnflushThreshold() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getUnflushThreshold() ==
defTopicProps.getUnflushThreshold()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushThreshold());
}
- if (topicProps.getUnflushInterval() ==
defTopicProps.getUnflushInterval()) {
+ if (topicProps.getUnflushInterval() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getUnflushInterval() ==
defTopicProps.getUnflushInterval()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushInterval());
}
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
- if
(topicProps.getDeletePolicy().equals(defTopicProps.getDeletePolicy())) {
+ if (TStringUtils.isEmpty(topicProps.getDeletePolicy())
+ ||
topicProps.getDeletePolicy().equals(defTopicProps.getDeletePolicy())) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getDeletePolicy());
}
- if (topicProps.getNumTopicStores() ==
defTopicProps.getNumTopicStores()) {
+ if (topicProps.getNumTopicStores() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getNumTopicStores() ==
defTopicProps.getNumTopicStores()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getNumTopicStores());
}
strBuff.append(TokenConstants.ATTR_SEP).append(topicEntity.getTopicStatusId());
- if (topicProps.getUnflushDataHold() ==
defTopicProps.getUnflushDataHold()) {
+ if (topicProps.getUnflushDataHold() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getUnflushDataHold() ==
defTopicProps.getUnflushDataHold()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getUnflushDataHold());
}
- if (topicProps.getMemCacheMsgSizeInMB() ==
defTopicProps.getMemCacheMsgSizeInMB()) {
+ if (topicProps.getMemCacheMsgSizeInMB() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getMemCacheMsgSizeInMB() ==
defTopicProps.getMemCacheMsgSizeInMB()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheMsgSizeInMB());
}
- if (topicProps.getMemCacheMsgCntInK() ==
defTopicProps.getMemCacheMsgCntInK()) {
+ if (topicProps.getMemCacheMsgCntInK() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getMemCacheMsgCntInK() ==
defTopicProps.getMemCacheMsgCntInK()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheMsgCntInK());
}
- if (topicProps.getMemCacheFlushIntvl() ==
defTopicProps.getMemCacheFlushIntvl()) {
+ if (topicProps.getMemCacheFlushIntvl() ==
TBaseConstants.META_VALUE_UNDEFINED
+ || topicProps.getMemCacheFlushIntvl() ==
defTopicProps.getMemCacheFlushIntvl()) {
strBuff.append(TokenConstants.ATTR_SEP).append(" ");
} else {
strBuff.append(TokenConstants.ATTR_SEP).append(topicProps.getMemCacheFlushIntvl());
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
index 056ab2b..55691be 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java
@@ -60,6 +60,15 @@ public class BrokerConfEntity extends BaseEntity implements
Cloneable {
this.brokerIp = brokerIp;
}
+ public BrokerConfEntity(BaseEntity opEntity, int brokerId,
+ String brokerIp, ClusterSettingEntity defSetting) {
+ super(opEntity);
+ this.brokerWebPort = defSetting.getBrokerWebPort();
+ this.topicProps.updModifyInfo(defSetting.getClsDefTopicProps());
+ setBrokerIpAndAllPort(brokerId, brokerIp,
+ defSetting.getBrokerPort(), defSetting.getBrokerTLSPort());
+ }
+
/**
* Initial Broker Configure entity by BdbBrokerConfEntity
*
@@ -383,21 +392,44 @@ public class BrokerConfEntity extends BaseEntity
implements Cloneable {
/**
* Get broker config string
*
- * @param strBuff the string buffer
+ * @param defSetting the default setting
+ * @param strBuff the string buffer
*/
- public void getBrokerDefaultConfInfo(StringBuilder strBuff) {
-
strBuff.append(topicProps.getNumPartitions()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.isAcceptPublish()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.isAcceptSubscribe()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getUnflushThreshold()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getUnflushInterval()).append(TokenConstants.ATTR_SEP)
- .append(" ").append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getDeletePolicy()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getNumTopicStores()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getUnflushDataHold()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getMemCacheMsgSizeInMB()).append(TokenConstants.ATTR_SEP)
-
.append(topicProps.getMemCacheMsgCntInK()).append(TokenConstants.ATTR_SEP)
- .append(topicProps.getMemCacheFlushIntvl());
+ public void getBrokerDefaultConfInfo(ClusterSettingEntity defSetting,
+ StringBuilder strBuff) {
+ TopicPropGroup defTopicProps = defSetting.getClsDefTopicProps();
+ strBuff.append((topicProps.getNumPartitions() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getNumPartitions() :
topicProps.getNumPartitions())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getAcceptPublish() == null)
+ ? defTopicProps.isAcceptPublish() :
topicProps.isAcceptPublish())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getAcceptSubscribe() == null)
+ ? defTopicProps.isAcceptSubscribe() :
topicProps.isAcceptSubscribe())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getUnflushThreshold() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getUnflushThreshold() :
topicProps.getUnflushThreshold())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getUnflushInterval() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getUnflushInterval() :
topicProps.getUnflushInterval())
+ .append(TokenConstants.ATTR_SEP).append("
").append(TokenConstants.ATTR_SEP)
+ .append((TStringUtils.isEmpty(topicProps.getDeletePolicy()))
+ ? defTopicProps.getDeletePolicy() :
topicProps.getDeletePolicy())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getNumTopicStores() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getNumTopicStores() :
topicProps.getNumTopicStores())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getUnflushDataHold() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getUnflushDataHold() :
topicProps.getUnflushDataHold())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getMemCacheMsgSizeInMB() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getMemCacheMsgSizeInMB() :
topicProps.getMemCacheMsgSizeInMB())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getMemCacheMsgCntInK() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getMemCacheMsgCntInK() :
topicProps.getMemCacheMsgCntInK())
+ .append(TokenConstants.ATTR_SEP)
+ .append((topicProps.getMemCacheFlushIntvl() ==
TBaseConstants.META_VALUE_UNDEFINED)
+ ? defTopicProps.getMemCacheFlushIntvl() :
topicProps.getMemCacheFlushIntvl());
}
/**
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index 6df2cfe..c1b6a61 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -51,6 +51,15 @@ public class TopicDeployEntity extends BaseEntity implements
Cloneable {
this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId,
topicName);
}
+ public TopicDeployEntity(BaseEntity opInfoEntity, int brokerId,
+ String topicName, TopicPropGroup topicProps) {
+ super(opInfoEntity);
+ this.brokerId = brokerId;
+ this.topicName = topicName;
+ this.recordKey = KeyBuilderUtils.buildTopicConfRecKey(brokerId,
topicName);
+ this.topicProps.updModifyInfo(topicProps);
+ }
+
/**
* Constructor by BdbTopicConfEntity
*
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
index 84297fc..077a1ce 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java
@@ -204,7 +204,13 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
lid = metaRowLock.getLock(null,
StringUtils.getBytesUtf8(String.valueOf(entity.getBrokerId())), true);
if (isAddOp) {
- brokerConfigMapper.addBrokerConf(entity, strBuff, result);
+ newEntity = new BrokerConfEntity(entity, entity.getBrokerId(),
+ entity.getBrokerIp(), getClusterDefSetting(false));
+ newEntity.updModifyInfo(entity.getDataVerId(),
entity.getBrokerPort(),
+ entity.getBrokerTLSPort(), entity.getBrokerWebPort(),
+ entity.getRegionId(), entity.getGroupId(),
entity.getManageStatus(),
+ entity.getTopicProps());
+ brokerConfigMapper.addBrokerConf(newEntity, strBuff, result);
} else {
printPrefix = "[updBrokerConf], ";
curEntity =
brokerConfigMapper.getBrokerConfByBrokerId(entity.getBrokerId());
@@ -446,7 +452,9 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB();
TopicCtrlEntity topicCtrlEntity =
topicCtrlMapper.getTopicCtrlConf(topicName);
if (topicCtrlEntity != null) {
- maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+ if (topicCtrlEntity.getMaxMsgSizeInMB() !=
TBaseConstants.META_VALUE_UNDEFINED) {
+ maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB();
+ }
}
return maxMsgSizeInMB;
}
@@ -466,7 +474,7 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
* Add if absent topic control configure info
*
* @param opEntity the operation info
- * @param topicName the topic name will be add
+ * @param topicName the topic name will be added
* @param strBuff the print info string buffer
* @param result the process result return
* @return true if success otherwise false
@@ -562,7 +570,7 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
@Override
public void addSystemTopicDeploy(int brokerId, int brokerPort,
String brokerIp, StringBuilder strBuff) {
- BaseEntity opEntity = new BaseEntity("system-self", new Date());
+ BaseEntity opEntity = new BaseEntity("systemSelf", new Date());
TopicPropGroup topicPropInfo = new TopicPropGroup();
topicPropInfo.setNumTopicStores(TServerConstants.OFFSET_HISTORY_NUMSTORES);
topicPropInfo.setNumPartitions(TServerConstants.OFFSET_HISTORY_NUMPARTS);
@@ -621,7 +629,15 @@ public abstract class AbsMetaConfigMapperImpl implements
MetaConfigMapper {
return result.isSuccess();
}
// add record
- topicDeployMapper.addTopicDeployConf(entity, strBuff,
result);
+ TopicPropGroup newProps =
+
getClusterDefSetting(false).getClsDefTopicProps().clone();
+ newProps.updModifyInfo(brokerEntity.getTopicProps());
+ newEntity = new TopicDeployEntity(entity,
+ entity.getBrokerId(), entity.getTopicName(),
newProps);
+ newEntity.updModifyInfo(entity.getDataVerId(),
entity.getTopicId(),
+ brokerEntity.getBrokerPort(),
brokerEntity.getBrokerIp(),
+ entity.getTopicStatus(), entity.getTopicProps());
+ topicDeployMapper.addTopicDeployConf(newEntity, strBuff,
result);
} else {
printPrefix = "[updTopicDeployConf], ";
if (curEntity == null) {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
index a5b21d7..54a09e2 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/TZKNodeKeys.java
@@ -18,8 +18,9 @@
package
org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.zkimpl;
public class TZKNodeKeys {
- public static final String ZK_BRANCH_HA = "masterHA";
public static final String ZK_BRANCH_META_DATA = "metaData";
+ public static final String ZK_BRANCH_MASTER_HA = "masterHA";
+ public static final String ZK_LEAF_MASTER_HA_NODEID = "nodeId";
public static final String ZK_LEAF_CLUSTER_CONFIG = "clusterConfig";
public static final String ZK_LEAF_BROKER_CONFIG = "brokerConfig";
public static final String ZK_LEAF_CONSUME_CTRL_CONFIG =
"consumeCtrlConfig";
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
index 86529cd..79cf3aa 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/zkimpl/ZKMetaConfigMapperImpl.java
@@ -53,7 +53,10 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
// the meta data path in ZooKeeper
private final String metaZkRoot;
// the ha path in ZooKeeper
+ private final String haParentPath;
+ // the ha path in ZooKeeper
private final String haNodesPath;
+
// whether is the first start
private volatile boolean isFirstChk = true;
// the ZooKeeper watcher
@@ -88,8 +91,11 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
this.metaZkRoot =
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
.append(TZKNodeKeys.ZK_BRANCH_META_DATA).toString();
strBuff.delete(0, strBuff.length());
- this.haNodesPath =
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
-
.append(TZKNodeKeys.ZK_BRANCH_HA).append("/nodeIds").toString();
+ this.haParentPath =
strBuff.append(tubeZkRoot).append(TokenConstants.SLASH)
+ .append(TZKNodeKeys.ZK_BRANCH_MASTER_HA).toString();
+ strBuff.delete(0, strBuff.length());
+ this.haNodesPath =
strBuff.append(this.haParentPath).append(TokenConstants.SLASH)
+
.append(TZKNodeKeys.ZK_LEAF_MASTER_HA_NODEID).append(TokenConstants.ATTR_SEP).toString();
strBuff.delete(0, strBuff.length());
try {
this.zkWatcher = new
ZooKeeperWatcher(masterConfig.getZkMetaConfig());
@@ -173,7 +179,7 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
return null;
}
if (!clusterNodeMap.isEmpty()) {
- return clusterNodeMap.get(queryResult.getF1()).split(":")[0];
+ return
clusterNodeMap.get(queryResult.getF1()).split(TokenConstants.ATTR_SEP)[0];
}
return null;
}
@@ -208,8 +214,9 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
List<ClusterNodeVO> clusterNodeVOs = new ArrayList<>();
for (Map.Entry<Long, String> entry : clusterNodeMap.entrySet()) {
nodeAdd = entry.getValue();
- clusterNodeVOs.add(new ClusterNodeVO(nodeAdd,
nodeAdd.split(":")[0],
- Integer.parseInt(nodeAdd.split(":")[1]),
+ clusterNodeVOs.add(new ClusterNodeVO(nodeAdd,
+ nodeAdd.split(TokenConstants.ATTR_SEP)[0],
+
Integer.parseInt(nodeAdd.split(TokenConstants.ATTR_SEP)[1]),
entry.getKey().equals(queryResult.getF1()) ? "Master" :
"Slave", 0));
}
if (clusterNodeMap.isEmpty()) {
@@ -245,9 +252,9 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
try {
if (isFirstChk) {
// check whether the HA directory already exists on ZK
- if (ZKUtil.checkExists(zkWatcher, haNodesPath) == -1) {
+ if (ZKUtil.checkExists(zkWatcher, haParentPath) == -1) {
// create path if not exists
- ZKUtil.createWithParents(zkWatcher, haNodesPath);
+ ZKUtil.createWithParents(zkWatcher, haParentPath);
} else {
isFirstChk = false;
}
@@ -320,14 +327,14 @@ public class ZKMetaConfigMapperImpl extends
AbsMetaConfigMapperImpl {
boolean foundSelf = false;
long minNodeId = Long.MAX_VALUE;
List<ZKUtil.NodeAndData> childNodes =
- ZKUtil.getChildDataAndWatchForNewChildren(zkWatcher,
haNodesPath);
+ ZKUtil.getChildDataAndWatchForNewChildren(zkWatcher,
haParentPath);
for (ZKUtil.NodeAndData child : childNodes) {
// select the first registered node as Master
if (child == null) {
continue;
}
nodeAdd = new String(child.getData());
- materNodeId = Long.parseLong(child.getNode().split(":")[1]);
+ materNodeId =
Long.parseLong(child.getNode().split(TokenConstants.ATTR_SEP)[1]);
if (minNodeId > materNodeId) {
minNodeId = materNodeId;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
index de1dafd..72729fe 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.java
@@ -227,7 +227,8 @@ public class DefBrokerRunManager implements
BrokerRunManager, ConfigObserver {
sBuffer.delete(0, sBuffer.length());
return result.isSuccess();
}
- brokerEntry.getBrokerDefaultConfInfo(sBuffer);
+ brokerEntry.getBrokerDefaultConfInfo(
+ metaDataService.getClusterDefSetting(false), sBuffer);
String brokerConfInfo = sBuffer.toString();
sBuffer.delete(0, sBuffer.length());
Map<String, String> topicConfInfoMap =
@@ -330,7 +331,8 @@ public class DefBrokerRunManager implements
BrokerRunManager, ConfigObserver {
BrokerConfEntity brokerConfEntity =
metaDataService.getBrokerConfByBrokerId(brokerId);
if (brokerConfEntity != null) {
- brokerConfEntity.getBrokerDefaultConfInfo(sBuffer);
+ brokerConfEntity.getBrokerDefaultConfInfo(
+ metaDataService.getClusterDefSetting(false), sBuffer);
brokerConfInfo = sBuffer.toString();
sBuffer.delete(0, sBuffer.length());
manageStatus = brokerConfEntity.getManageStatus();
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
index d7b3f94..8be5baf 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebMasterInfoHandler.java
@@ -288,20 +288,21 @@ public class WebMasterInfoHandler extends
AbstractWebHandler {
isAcceptPublish = false;
isAcceptSubscribe = false;
for (TopicDeployEntity entity : entry.getValue()) {
+ BrokerConfEntity brokerConfEntity =
+
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
+ if (brokerConfEntity == null) {
+ continue;
+ }
brokerCount++;
+ Tuple2<Boolean, Boolean> pubSubStatus =
+ WebParameterUtils.getPubSubStatusByManageStatus(
+ brokerConfEntity.getManageStatus().getCode());
+ isAcceptPublish = pubSubStatus.getF0();
+ isAcceptSubscribe = pubSubStatus.getF1();
TopicPropGroup topicProps = entity.getTopicProps();
totalCfgTopicStoreCount += topicProps.getNumTopicStores();
totalCfgNumPartCount +=
topicProps.getNumPartitions() *
topicProps.getNumTopicStores();
- BrokerConfEntity brokerConfEntity =
-
defMetaDataService.getBrokerConfByBrokerId(entity.getBrokerId());
- if (brokerConfEntity != null) {
- Tuple2<Boolean, Boolean> pubSubStatus =
- WebParameterUtils.getPubSubStatusByManageStatus(
-
brokerConfEntity.getManageStatus().getCode());
- isAcceptPublish = pubSubStatus.getF0();
- isAcceptSubscribe = pubSubStatus.getF1();
- }
TopicInfo topicInfo =
brokerRunManager.getPubBrokerTopicInfo(entity.getBrokerId(),
entity.getTopicName());
if (topicInfo != null) {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index a37fe43..ef77e2a 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -61,6 +61,8 @@ public class WebTopicDeployHandler extends AbstractWebHandler
{
// register query method
registerQueryWebMethod("admin_query_topic_deploy_info",
"adminNewQueryTopicCfgAndRunInfo");
+ registerQueryWebMethod("admin_query_topic_deploy_configure",
+ "innQueryTopicDeployConfInfo");
registerQueryWebMethod("admin_query_broker_topic_config_info",
"adminQueryBrokerTopicCfgAndRunInfo");
registerQueryWebMethod("admin_query_topicName",
@@ -438,6 +440,80 @@ public class WebTopicDeployHandler extends
AbstractWebHandler {
* @param result process result
* @return process result
*/
+ private StringBuilder innQueryTopicDeployConfInfo(HttpServletRequest req,
+ StringBuilder sBuffer,
+ ProcessResult result) {
+ TopicDeployEntity qryEntity = new TopicDeployEntity();
+ // get queried operation info, for createUser, modifyUser,
dataVersionId
+ if (!WebParameterUtils.getQueriedOperateInfo(req, qryEntity, sBuffer,
result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ // check and get topicName field
+ if (!WebParameterUtils.getStringParamValue(req,
+ WebFieldDef.COMPSTOPICNAME, false, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final Set<String> topicNameSet = (Set<String>) result.getRetData();
+ // check and get brokerId field
+ if (!WebParameterUtils.getIntParamValue(req,
+ WebFieldDef.COMPSBROKERID, false, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final Set<Integer> brokerIdSet = (Set<Integer>) result.getRetData();
+ // get brokerPort field
+ if (!WebParameterUtils.getIntParamValue(req, WebFieldDef.BROKERPORT,
+ false, TBaseConstants.META_VALUE_UNDEFINED, 1, sBuffer,
result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ final int brokerPort = (int) result.getRetData();
+ // get and valid topicProps info
+ if (!WebParameterUtils.getTopicPropInfo(req, null, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ TopicPropGroup topicProps = (TopicPropGroup) result.getRetData();
+ // get and valid TopicStatusId info
+ if (!WebParameterUtils.getTopicStatusParamValue(req,
+ false, TopicStatus.STATUS_TOPIC_UNDEFINED, sBuffer, result)) {
+ WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+ return sBuffer;
+ }
+ TopicStatus topicStatus = (TopicStatus) result.getRetData();
+ qryEntity.updModifyInfo(qryEntity.getDataVerId(),
+ TBaseConstants.META_VALUE_UNDEFINED,
+ brokerPort, null, topicStatus, topicProps);
+ Map<String, List<TopicDeployEntity>> topicDeployInfoMap =
+ defMetaDataService.getTopicDeployInfoMap(topicNameSet,
brokerIdSet, qryEntity);
+ // build query result
+ int totalCnt = 0;
+ WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+ for (Map.Entry<String, List<TopicDeployEntity>> entry :
topicDeployInfoMap.entrySet()) {
+ if (entry.getValue() == null || entry.getValue().isEmpty()) {
+ continue;
+ }
+ for (TopicDeployEntity entity : entry.getValue()) {
+ if (totalCnt++ > 0) {
+ sBuffer.append(",");
+ }
+ entity.toWebJsonStr(sBuffer, true, true);
+ }
+ }
+ WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+ return sBuffer;
+ }
+
+ /**
+ * Query topic info
+ *
+ * @param req Http Servlet Request
+ * @param sBuffer string buffer
+ * @param result process result
+ * @return process result
+ */
private StringBuilder innQueryTopicConfAndRunInfo(HttpServletRequest req,
StringBuilder sBuffer,
ProcessResult result,
@@ -536,7 +612,9 @@ public class WebTopicDeployHandler extends
AbstractWebHandler {
sBuffer.append(",");
}
maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB();
- maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
+ if (ctrlEntity.getMaxMsgSizeInMB() !=
TBaseConstants.META_VALUE_UNDEFINED) {
+ maxMsgSizeInMB = ctrlEntity.getMaxMsgSizeInMB();
+ }
enableAuthCtrl = ctrlEntity.getAuthCtrlStatus().isEnable();
sBuffer.append("{\"topicName\":\"").append(entry.getKey())
.append("\",\"maxMsgSizeInMB\":").append(maxMsgSizeInMB)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
index 5eaf707..e903a9f 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliMetaDataBRU.java
@@ -972,9 +972,9 @@ public class CliMetaDataBRU extends CliAbstractBase {
* @return the query result, null if query failure
*/
private Map<String, TopicDeployEntity> getTopicDeployInfos(StringBuilder
strBuff) {
- //
http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_deploy_info
+ //
http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_deploy_configure
JsonObject jsonRes = qryDataFromMaster(
- "admin_query_topic_deploy_info", new HashMap<>(), strBuff);
+ "admin_query_topic_deploy_configure", new HashMap<>(),
strBuff);
// check return result
if (!jsonRes.get("result").getAsBoolean()) {
logger.info(strBuff.append("Query topic deploy configurations info
failure:")