This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0375027b7 [ISSUE #4382]Namesrv nearby route (#4383)
0375027b7 is described below
commit 0375027b70c54bd33afd1680dee68ebcb2d1001f
Author: sunhangda <[email protected]>
AuthorDate: Thu Aug 11 17:31:45 2022 +0800
[ISSUE #4382]Namesrv nearby route (#4383)
* fix:when broker is down,async send model can not retry
* fix Issue #3556
* fix Issue #3556
* fix Issue #3556
* fix Issue #3556
* async send model success retry when occurs Exception
* test case testSendMessageAsync_WithException
* modify test case testSendMessageAsync_WithException
* adding exception message with broker addr when occuring broker connect
timeout
* repair code style
* in async mode,set ThreadExecutorPool AbortPolicy
* Optimize asynchronous send timeout logic code
* Adjusting log information
* Delete SendMessageProcessor.java
* 恢复上一个版本
* namesrv nearby route
* Revert to previous version code
* add lisence header to Network.java
* code style
* 根据broker和client配置的标识返回路由信息
* Delete greetings.yml
* Delete .gitignore
* reset previous version
* reset to previous version
* code style
* Namesrv nearby route implements all by RPCHook
* reset to prevous version
* ServiceProvider return to original position
* Nearby route make the code clear
* revert to ori version
* reset to ori version
* reset to ori version
* reset to ori version
* Keep same as the official version
* Keep same as the official version
* Keep same as the official version
Co-authored-by: sunhangda <[email protected]>
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 7 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 5 +-
.../java/org/apache/rocketmq/common/MixAll.java | 7 ++
.../rocketmq/common/protocol/route/BrokerData.java | 21 ++++-
.../common/rpchook/DynamicalExtFieldRPCHook.java | 42 +++++++++
.../apache/rocketmq/namesrv/NamesrvController.java | 6 ++
.../apache/rocketmq/namesrv/NamesrvStartup.java | 1 -
.../namesrv/processor/DefaultRequestProcessor.java | 6 +-
.../rocketmq/namesrv/route/ZoneRouteRPCHook.java | 99 ++++++++++++++++++++++
.../namesrv/routeinfo/RouteInfoManager.java | 32 ++++---
.../namesrv/processor/RequestProcessorTest.java | 3 +-
.../namesrv/routeinfo/GetRouteInfoBenchmark.java | 2 +-
.../namesrv/routeinfo/RegisterBrokerBenchmark.java | 2 +
.../namesrv/routeinfo/RouteInfoManagerTest.java | 4 +-
.../routeinfo/RouteInfoManagerTestBase.java | 1 +
.../routeinfo/RouteInfoManager_NewTest.java | 1 +
16 files changed, 210 insertions(+), 29 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d836972cb..6f89d4a7d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -85,6 +85,9 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRespon
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
@@ -97,8 +100,6 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientImpl;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -130,7 +131,7 @@ public class BrokerOuterAPI {
private RpcClient rpcClient;
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
- this(nettyClientConfig, null, new ClientMetadata());
+ this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new
ClientMetadata());
}
private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook
rpcHook, ClientMetadata clientMetadata) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7719b0ae6..7de364bcb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl;
-import com.alibaba.fastjson.JSON;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -193,6 +192,7 @@ import
org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.GroupForbidden;
@@ -217,8 +217,8 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import com.alibaba.fastjson.JSON;
public class MQClientAPIImpl implements NameServerUpdateCallback {
private final static InternalLogger log = ClientLogger.getLog();
@@ -249,6 +249,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
}
this.remotingClient.registerRPCHook(rpcHook);
+ this.remotingClient.registerRPCHook(new DynamicalExtFieldRPCHook());
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE,
this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED,
this.clientRemotingProcessor, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 322a78d30..5eca378e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -96,6 +96,13 @@ public class MixAll {
public static final String LMQ_PREFIX = "%LMQ%";
public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
public static final String REQ_T = "ReqT";
+ public static final String ROCKETMQ_ZONE_ENV = "ROCKETMQ_ZONE";
+ public static final String ROCKETMQ_ZONE_PROPERTY = "rocketmq.zone";
+ public static final String ROCKETMQ_ZONE_MODE_ENV = "ROCKETMQ_ZONE_MODE";
+ public static final String ROCKETMQ_ZONE_MODE_PROPERTY =
"rocketmq.zone.mode";
+ public static final String ZONE_NAME = "__ZONE_NAME";
+ public static final String ZONE_MODE = "__ZONE_MODE";
+
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
public static final String METADATA_SCOPE_GLOBAL = "__global__";
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 6bca94d8d..f13491350 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -30,7 +30,7 @@ public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */>
brokerAddrs;
-
+ private String zoneName;
private final Random random = new Random();
/**
@@ -60,14 +60,21 @@ public class BrokerData implements Comparable<BrokerData> {
this.brokerAddrs = brokerAddrs;
}
- public BrokerData(String cluster, String brokerName, HashMap<Long, String>
brokerAddrs,
- boolean enableActingMaster) {
+ public BrokerData(String cluster, String brokerName, HashMap<Long, String>
brokerAddrs, boolean enableActingMaster) {
this.cluster = cluster;
this.brokerName = brokerName;
this.brokerAddrs = brokerAddrs;
this.enableActingMaster = enableActingMaster;
}
+ public BrokerData(String cluster, String brokerName, HashMap<Long, String>
brokerAddrs, boolean enableActingMaster, String zoneName) {
+ this.cluster = cluster;
+ this.brokerName = brokerName;
+ this.brokerAddrs = brokerAddrs;
+ this.enableActingMaster = enableActingMaster;
+ this.zoneName = zoneName;
+ }
+
/**
* Selects a (preferably master) broker address from the registered list.
If the master's address cannot be found, a
* slave broker address is selected in a random manner.
@@ -109,6 +116,14 @@ public class BrokerData implements Comparable<BrokerData> {
this.enableActingMaster = enableActingMaster;
}
+ public String getZoneName() {
+ return zoneName;
+ }
+
+ public void setZoneName(String zoneName) {
+ this.zoneName = zoneName;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
b/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
new file mode 100644
index 000000000..d40b6100a
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rpchook;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class DynamicalExtFieldRPCHook implements RPCHook {
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+ String zoneName = System.getProperty(MixAll.ROCKETMQ_ZONE_PROPERTY,
System.getenv(MixAll.ROCKETMQ_ZONE_ENV));
+ if (StringUtils.isNotBlank(zoneName)) {
+ request.addExtField(MixAll.ZONE_NAME, zoneName);
+ }
+ String zoneMode =
System.getProperty(MixAll.ROCKETMQ_ZONE_MODE_PROPERTY,
System.getenv(MixAll.ROCKETMQ_ZONE_MODE_ENV));
+ if (StringUtils.isNotBlank(zoneMode)) {
+ request.addExtField(MixAll.ZONE_MODE, zoneMode);
+ }
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request,
RemotingCommand response) {
+
+ }
+}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 784d6eb0f..9c96917c1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
+import org.apache.rocketmq.namesrv.route.ZoneRouteRPCHook;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -198,6 +199,7 @@ public class NamesrvController {
}
}
+ initialRpcHooks();
return true;
}
@@ -240,6 +242,10 @@ public class NamesrvController {
}
}
+ private void initialRpcHooks() {
+ this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
+ }
+
public void start() throws Exception {
this.remotingServer.start();
this.remotingClient.start();
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 43a738b16..8cc49caa9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -241,7 +241,6 @@ public class NamesrvStartup {
opt = new Option("p", "printConfigItem", false, "Print all config
items");
opt.setRequired(false);
options.addOption(opt);
-
return options;
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 315bf7ef2..d28b294b5 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -238,6 +238,7 @@ public class DefaultRequestProcessor implements
NettyRequestProcessor {
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
+ request.getExtFields().get(MixAll.ZONE_NAME),
requestHeader.getHeartbeatTimeoutMillis(),
requestHeader.getEnableActingMaster(),
topicConfigWrapper,
@@ -350,10 +351,9 @@ public class DefaultRequestProcessor implements
NettyRequestProcessor {
}
public RemotingCommand unregisterBroker(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- final UnRegisterBrokerRequestHeader requestHeader =
- (UnRegisterBrokerRequestHeader)
request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
+ final UnRegisterBrokerRequestHeader requestHeader =
(UnRegisterBrokerRequestHeader)
request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
if
(!this.namesrvController.getRouteInfoManager().submitUnRegisterBrokerRequest(requestHeader))
{
log.warn("Couldn't submit the unregister broker request to
handler, broker info: {}", requestHeader);
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
new file mode 100644
index 000000000..98a50df6e
--- /dev/null
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.route;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class ZoneRouteRPCHook implements RPCHook {
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request,
RemotingCommand response) {
+ if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) {
+ return;
+ }
+ if (response == null || response.getBody() == null ||
ResponseCode.SUCCESS != response.getCode()) {
+ return;
+ }
+ boolean zoneMode =
Boolean.valueOf(request.getExtFields().get(MixAll.ZONE_MODE));
+ if (!zoneMode) {
+ return;
+ }
+ String zoneName = request.getExtFields().get(MixAll.ZONE_NAME);
+ if (StringUtils.isBlank(zoneName)) {
+ return;
+ }
+ TopicRouteData topicRouteData =
RemotingSerializable.decode(response.getBody(), TopicRouteData.class);
+
+ response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
+ }
+
+ private TopicRouteData filterByZoneName(TopicRouteData topicRouteData,
String zoneName) {
+ List<BrokerData> brokerDataReserved = new ArrayList<>();
+ Map<String, BrokerData> brokerDataRemoved = new HashMap<>();
+ for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+ //master down, consume from slave. break nearby route rule.
+ if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null
+ || StringUtils.equalsIgnoreCase(brokerData.getZoneName(),
zoneName)) {
+ brokerDataReserved.add(brokerData);
+ } else {
+ brokerDataRemoved.put(brokerData.getBrokerName(), brokerData);
+ }
+ }
+ topicRouteData.setBrokerDatas(brokerDataReserved);
+
+ List<QueueData> queueDataReserved = new ArrayList<>();
+ for (QueueData queueData : topicRouteData.getQueueDatas()) {
+ if (!brokerDataRemoved.containsKey(queueData.getBrokerName())) {
+ queueDataReserved.add(queueData);
+ }
+ }
+ topicRouteData.setQueueDatas(queueDataReserved);
+ // remove filter server table by broker address
+ if (topicRouteData.getFilterServerTable() != null &&
!topicRouteData.getFilterServerTable().isEmpty()) {
+ for (Entry<String, BrokerData> entry :
brokerDataRemoved.entrySet()) {
+ BrokerData brokerData = entry.getValue();
+ if (brokerData.getBrokerAddrs() == null) {
+ continue;
+ }
+ brokerData.getBrokerAddrs().values()
+ .stream()
+ .forEach(brokerAddr ->
topicRouteData.getFilterServerTable().remove(brokerAddr));
+ }
+ }
+ return topicRouteData;
+ }
+}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 79b826048..05cff6595 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -208,16 +208,17 @@ public class RouteInfoManager {
}
public RegisterBrokerResult registerBroker(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final Long timeoutMillis,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final Channel channel) {
- return registerBroker(clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, timeoutMillis, false, topicConfigWrapper, filterServerList,
channel);
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final String zoneName,
+ final Long timeoutMillis,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final Channel channel) {
+ return registerBroker(clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, zoneName, timeoutMillis, false, topicConfigWrapper,
filterServerList, channel);
}
public RegisterBrokerResult registerBroker(
@@ -226,6 +227,7 @@ public class RouteInfoManager {
final String brokerName,
final long brokerId,
final String haServerAddr,
+ final String zoneName,
final Long timeoutMillis,
final Boolean enableActingMaster,
final TopicConfigSerializeWrapper topicConfigWrapper,
@@ -250,7 +252,8 @@ public class RouteInfoManager {
boolean isOldVersionBroker = enableActingMaster == null;
brokerData.setEnableActingMaster(isOldVersionBroker ? false :
enableActingMaster);
-
+ brokerData.setZoneName(zoneName);
+
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
boolean isMinBrokerIdChanged = false;
@@ -688,8 +691,11 @@ public class RouteInfoManager {
for (String brokerName : brokerNameSet) {
BrokerData brokerData =
this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
- BrokerData brokerDataClone = new
BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long,
String>) brokerData
- .getBrokerAddrs().clone(),
brokerData.isEnableActingMaster());
+ BrokerData brokerDataClone = new
BrokerData(brokerData.getCluster(),
+ brokerData.getBrokerName(),
+ (HashMap<Long, String>)
brokerData.getBrokerAddrs().clone(),
+ brokerData.isEnableActingMaster(),
brokerData.getZoneName());
+
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
if (!filterServerTable.isEmpty()) {
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 02c30d55f..512d6b3d6 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -573,8 +573,9 @@ public class RequestProcessorTest {
}
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
- RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911",
"default-broker", 1234, "127.0.0.1:1001",
+ RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911",
"default-broker", 1234, "127.0.0.1:1001", "",
null, topicConfigSerializeWrapper, new ArrayList<String>(),
channel);
}
+
}
\ No newline at end of file
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
index 0ea8b054a..8dfb88e72 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
@@ -116,7 +116,7 @@ public class GetRouteInfoBenchmark {
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
- routeInfoManager.registerBroker(clusterName,
brokerAddr, brokerName, 0, brokerAddr,
+ routeInfoManager.registerBroker(clusterName,
brokerAddr, brokerName, 0, brokerAddr, "",
null, topicConfigSerializeWrapper, new
ArrayList<String>(), channel);
}
}
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
index f64211582..ba3e4530d 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
@@ -145,6 +145,7 @@ public class RegisterBrokerBenchmark {
routeInfoManager.registerBroker("DefaultCluster" + index,
"127.0.0.1:500" + index,
"DefaultBroker" + index, 0, "127.0.0.1:400" + index,
+ "",
null,
topicConfigSerializeWrapper, new ArrayList<String>(), channel);
}
@@ -167,6 +168,7 @@ public class RegisterBrokerBenchmark {
routeInfoManager.registerBroker("DefaultCluster" + index,
"127.0.0.1:500" + index,
"DefaultBroker" + index, 0, "127.0.0.1:400" + index,
+ "",
null,
topicConfigSerializeWrapper, new ArrayList<String>(), channel);
}
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
index e4b8db78d..9c397ab02 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
@@ -89,7 +89,7 @@ public class RouteInfoManagerTest {
topicConfigSerializeWrapper.setDataVersion(targetVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
- RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster-1", "127.0.0.1:10911",
"default-broker-1", 1234, "127.0.0.1:1001",
+ RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster-1", "127.0.0.1:10911",
"default-broker-1", 1234, "127.0.0.1:1001", "",
null, topicConfigSerializeWrapper, new
ArrayList<String>(), channel);
assertThat(registerBrokerResult).isNotNull();
@@ -129,7 +129,7 @@ public class RouteInfoManagerTest {
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
- RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911",
"default-broker", 1234, "127.0.0.1:1001",
+ RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911",
"default-broker", 1234, "127.0.0.1:1001", "",
null, topicConfigSerializeWrapper, new ArrayList<String>(),
channel);
assertThat(registerBrokerResult).isNotNull();
}
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
index e4ddaa3c3..55d730c54 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
@@ -148,6 +148,7 @@ public class RouteInfoManagerTestBase {
brokerAddr,
brokerName,
brokerId,
+ "",
haServerAddr,
null,
topicConfigSerializeWrapper,
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
index c50acfd8e..e098f0ef9 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
@@ -707,6 +707,7 @@ public class RouteInfoManager_NewTest {
brokerInfo.brokerName,
brokerInfo.brokerId,
brokerInfo.haAddr,
+ "",
null,
brokerInfo.enableActingMaster,
topicConfigSerializeWrapper, new ArrayList<String>(), channel);