joeCarf commented on code in PR #7188:
URL: https://github.com/apache/rocketmq/pull/7188#discussion_r1302903236
##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java:
##########
@@ -334,28 +328,42 @@ public RegisterBrokerResult registerBroker(
}
}
- for (Map.Entry<String, TopicConfig> entry :
tcTable.entrySet()) {
- if (registerFirst ||
this.isTopicConfigChanged(clusterName, brokerAddr,
- topicConfigWrapper.getDataVersion(), brokerName,
- entry.getValue().getTopicName())) {
- final TopicConfig topicConfig = entry.getValue();
- if (isPrimeSlave) {
+ if (registerFirst ||
this.isBrokerTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion())) {
+ if (isPrimeSlave) {
+ for (TopicConfig topicConfig : tcTable.values()) {
// Wipe write perm for prime slave
topicConfig.setPerm(topicConfig.getPerm() &
(~PermName.PERM_WRITE));
+ this.createAndUpdateQueueData(brokerName,
topicConfig);
+ }
+ } else {
+ for (TopicConfig topicConfig : tcTable.values()) {
+ this.createAndUpdateQueueData(brokerName,
topicConfig);
}
- this.createAndUpdateQueueData(brokerName,
topicConfig);
}
- }
- if (this.isBrokerTopicConfigChanged(clusterName,
brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
//the topicQueueMappingInfoMap should never be null,
but can be empty
for (Map.Entry<String, TopicQueueMappingInfo> entry :
topicQueueMappingInfoMap.entrySet()) {
- if
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
- topicQueueMappingInfoTable.put(entry.getKey(),
new HashMap<>());
- }
+ Map<String, TopicQueueMappingInfo>
topicQueueMappingInfo =
ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Map<String,
TopicQueueMappingInfo>>) topicQueueMappingInfoTable, entry.getKey(), k -> new
HashMap<>());
//Note asset brokerName equal
entry.getValue().getBname()
//here use the mappingDetail.bname
-
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(),
entry.getValue());
+ TopicQueueMappingInfo value = entry.getValue();
+ topicQueueMappingInfo.put(value.getBname(), value);
+ }
+ } else {
+ if (isPrimeSlave) {
+ for (TopicConfig topicConfig : tcTable.values()) {
+ if (this.isTopicConfigChanged(false,
brokerName, topicConfig.getTopicName())) {
+ // Wipe write perm for prime slave
+ topicConfig.setPerm(topicConfig.getPerm()
& (~PermName.PERM_WRITE));
+ this.createAndUpdateQueueData(brokerName,
topicConfig);
+ }
+ }
+ } else {
Review Comment:
suggest not using too much if..else.., replace with just `if` clause
##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java:
##########
@@ -245,71 +245,65 @@ public RegisterBrokerResult registerBroker(
boolean registerFirst = false;
- BrokerData brokerData = this.brokerAddrTable.get(brokerName);
- if (null == brokerData) {
+ BrokerData brokerData =
ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, BrokerData>)
this.brokerAddrTable, brokerName, k -> new BrokerData(clusterName, brokerName,
new HashMap<>()));
+ Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
+ if (brokerAddrsMap.isEmpty()) {
registerFirst = true;
- brokerData = new BrokerData(clusterName, brokerName, new
HashMap<>());
- this.brokerAddrTable.put(brokerName, brokerData);
}
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(!isOldVersionBroker &&
enableActingMaster);
brokerData.setZoneName(zoneName);
- Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
-
- boolean isMinBrokerIdChanged = false;
long prevMinBrokerId = 0;
- if (!brokerAddrsMap.isEmpty()) {
+ if (!registerFirst) {
prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
}
- if (brokerId < prevMinBrokerId) {
- isMinBrokerIdChanged = true;
- }
+ boolean isMinBrokerIdChanged = brokerId < prevMinBrokerId;
//Switch slave to master: first remove <1, IP:PORT> in namesrv,
then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
- brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr &&
brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
+ if (brokerAddr != null) {
+ brokerAddrsMap.entrySet().removeIf(item ->
brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
+ }
//If Local brokerId stateVersion bigger than the registering one,
String oldBrokerAddr = brokerAddrsMap.get(brokerId);
- if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
- BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new
BrokerAddrInfo(clusterName, oldBrokerAddr));
-
- if (null != oldBrokerInfo) {
- long oldStateVersion =
oldBrokerInfo.getDataVersion().getStateVersion();
- long newStateVersion =
topicConfigWrapper.getDataVersion().getStateVersion();
- if (oldStateVersion > newStateVersion) {
- log.warn("Registered Broker conflicts with the existed
one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
- "Old BrokerAddr:{}, Old Version:{}, New
BrokerAddr:{}, New Version:{}.",
- clusterName, brokerName, brokerId, oldBrokerAddr,
oldStateVersion, brokerAddr, newStateVersion);
- //Remove the rejected brokerAddr from brokerLiveTable.
- brokerLiveTable.remove(new BrokerAddrInfo(clusterName,
brokerAddr));
- return result;
+ if (null != oldBrokerAddr) {
+
+ if (!oldBrokerAddr.equals(brokerAddr)) {
+ BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new
BrokerAddrInfo(clusterName, oldBrokerAddr));
+
+ if (null != oldBrokerInfo) {
+ long oldStateVersion =
oldBrokerInfo.getDataVersion().getStateVersion();
+ long newStateVersion =
topicConfigWrapper.getDataVersion().getStateVersion();
+ if (oldStateVersion > newStateVersion) {
+ log.warn("Registered Broker conflicts with the
existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
+ "Old BrokerAddr:{}, Old
Version:{}, New BrokerAddr:{}, New Version:{}.",
+ clusterName, brokerName, brokerId,
oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
+ //Remove the rejected brokerAddr from
brokerLiveTable.
+ brokerLiveTable.remove(new
BrokerAddrInfo(clusterName, brokerAddr));
+ return result;
+ }
}
}
- }
-
- if (!brokerAddrsMap.containsKey(brokerId) &&
topicConfigWrapper.getTopicConfigTable().size() == 1) {
+ } else if (topicConfigWrapper.getTopicConfigTable().size() == 1) {
Review Comment:
I think the former style is better
##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java:
##########
@@ -245,71 +245,65 @@ public RegisterBrokerResult registerBroker(
boolean registerFirst = false;
- BrokerData brokerData = this.brokerAddrTable.get(brokerName);
- if (null == brokerData) {
+ BrokerData brokerData =
ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, BrokerData>)
this.brokerAddrTable, brokerName, k -> new BrokerData(clusterName, brokerName,
new HashMap<>()));
Review Comment:
`this.brokerAddrTable.put(brokerName, brokerData);` this line is removed,
the new BrokerData will not be put in brokerAddrTable
##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java:
##########
@@ -171,7 +171,7 @@ public void deleteTopic(final String topic, final String
clusterName) {
this.lock.writeLock().lockInterruptibly();
//get all the brokerNames fot the specified cluster
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
- if (brokerNames == null || brokerNames.isEmpty()) {
+ if (brokerNames == null) {
Review Comment:
why remove this?
suggest `CollectionUtils.isEmpty(brokerName)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]