This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0375027b7 [ISSUE #4382]Namesrv nearby route (#4383)
0375027b7 is described below

commit 0375027b70c54bd33afd1680dee68ebcb2d1001f
Author: sunhangda <[email protected]>
AuthorDate: Thu Aug 11 17:31:45 2022 +0800

    [ISSUE #4382]Namesrv nearby route (#4383)
    
    * fix:when broker is down,async send model can not retry
    
    * fix Issue #3556
    
    * fix Issue #3556
    
    * fix Issue #3556
    
    * fix Issue #3556
    
    * async send model success retry when occurs Exception
    
    * test case testSendMessageAsync_WithException
    
    * modify test case testSendMessageAsync_WithException
    
    * adding exception message with broker addr when occuring broker connect
    timeout
    
    * repair code style
    
    * in async mode,set ThreadExecutorPool AbortPolicy
    
    * Optimize asynchronous send timeout logic code
    
    * Adjusting log information
    
    * Delete SendMessageProcessor.java
    
    * 恢复上一个版本
    
    * namesrv nearby route
    
    * Revert to previous version code
    
    * add lisence header to Network.java
    
    * code style
    
    * 根据broker和client配置的标识返回路由信息
    
    * Delete greetings.yml
    
    * Delete .gitignore
    
    * reset previous version
    
    * reset to previous version
    
    * code style
    
    * Namesrv nearby route implements all by RPCHook
    
    * reset to prevous version
    
    * ServiceProvider return to original position
    
    * Nearby route make the code clear
    
    * revert to ori version
    
    * reset to ori version
    
    * reset to ori version
    
    * reset to ori version
    
    * Keep same as the official version
    
    * Keep same as the official version
    
    * Keep same as the official version
    
    Co-authored-by: sunhangda <[email protected]>
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  7 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  5 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |  7 ++
 .../rocketmq/common/protocol/route/BrokerData.java | 21 ++++-
 .../common/rpchook/DynamicalExtFieldRPCHook.java   | 42 +++++++++
 .../apache/rocketmq/namesrv/NamesrvController.java |  6 ++
 .../apache/rocketmq/namesrv/NamesrvStartup.java    |  1 -
 .../namesrv/processor/DefaultRequestProcessor.java |  6 +-
 .../rocketmq/namesrv/route/ZoneRouteRPCHook.java   | 99 ++++++++++++++++++++++
 .../namesrv/routeinfo/RouteInfoManager.java        | 32 ++++---
 .../namesrv/processor/RequestProcessorTest.java    |  3 +-
 .../namesrv/routeinfo/GetRouteInfoBenchmark.java   |  2 +-
 .../namesrv/routeinfo/RegisterBrokerBenchmark.java |  2 +
 .../namesrv/routeinfo/RouteInfoManagerTest.java    |  4 +-
 .../routeinfo/RouteInfoManagerTestBase.java        |  1 +
 .../routeinfo/RouteInfoManager_NewTest.java        |  1 +
 16 files changed, 210 insertions(+), 29 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d836972cb..6f89d4a7d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -85,6 +85,9 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRespon
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
@@ -97,8 +100,6 @@ import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.rpc.RpcClient;
 import org.apache.rocketmq.common.rpc.RpcClientImpl;
 import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -130,7 +131,7 @@ public class BrokerOuterAPI {
     private RpcClient rpcClient;
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
-        this(nettyClientConfig, null, new ClientMetadata());
+        this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new 
ClientMetadata());
     }
 
     private BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook 
rpcHook, ClientMetadata clientMetadata) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7719b0ae6..7de364bcb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.client.impl;
 
-import com.alibaba.fastjson.JSON;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -193,6 +192,7 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpchook.DynamicalExtFieldRPCHook;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.subscription.GroupForbidden;
@@ -217,8 +217,8 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import com.alibaba.fastjson.JSON;
 
 public class MQClientAPIImpl implements NameServerUpdateCallback {
     private final static InternalLogger log = ClientLogger.getLog();
@@ -249,6 +249,7 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
             this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
         }
         this.remotingClient.registerRPCHook(rpcHook);
+        this.remotingClient.registerRPCHook(new DynamicalExtFieldRPCHook());
         
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, 
this.clientRemotingProcessor, null);
 
         
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, 
this.clientRemotingProcessor, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java 
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 322a78d30..5eca378e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -96,6 +96,13 @@ public class MixAll {
     public static final String LMQ_PREFIX = "%LMQ%";
     public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
     public static final String REQ_T = "ReqT";
+    public static final String ROCKETMQ_ZONE_ENV = "ROCKETMQ_ZONE";
+    public static final String ROCKETMQ_ZONE_PROPERTY = "rocketmq.zone";
+    public static final String ROCKETMQ_ZONE_MODE_ENV = "ROCKETMQ_ZONE_MODE";
+    public static final String ROCKETMQ_ZONE_MODE_PROPERTY = 
"rocketmq.zone.mode";
+    public static final String ZONE_NAME = "__ZONE_NAME"; 
+    public static final String ZONE_MODE = "__ZONE_MODE";
+
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
     public static final String METADATA_SCOPE_GLOBAL = "__global__";
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 6bca94d8d..f13491350 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -30,7 +30,7 @@ public class BrokerData implements Comparable<BrokerData> {
     private String cluster;
     private String brokerName;
     private HashMap<Long/* brokerId */, String/* broker address */> 
brokerAddrs;
-
+    private String zoneName;
     private final Random random = new Random();
 
     /**
@@ -60,14 +60,21 @@ public class BrokerData implements Comparable<BrokerData> {
         this.brokerAddrs = brokerAddrs;
     }
 
-    public BrokerData(String cluster, String brokerName, HashMap<Long, String> 
brokerAddrs,
-        boolean enableActingMaster) {
+    public BrokerData(String cluster, String brokerName, HashMap<Long, String> 
brokerAddrs, boolean enableActingMaster) {
         this.cluster = cluster;
         this.brokerName = brokerName;
         this.brokerAddrs = brokerAddrs;
         this.enableActingMaster = enableActingMaster;
     }
 
+    public BrokerData(String cluster, String brokerName, HashMap<Long, String> 
brokerAddrs, boolean enableActingMaster, String zoneName) {
+        this.cluster = cluster;
+        this.brokerName = brokerName;
+        this.brokerAddrs = brokerAddrs;
+        this.enableActingMaster = enableActingMaster;
+        this.zoneName = zoneName;
+    }
+
     /**
      * Selects a (preferably master) broker address from the registered list. 
If the master's address cannot be found, a
      * slave broker address is selected in a random manner.
@@ -109,6 +116,14 @@ public class BrokerData implements Comparable<BrokerData> {
         this.enableActingMaster = enableActingMaster;
     }
 
+    public String getZoneName() {
+        return zoneName;
+    }
+
+    public void setZoneName(String zoneName) {
+        this.zoneName = zoneName;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
 
b/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
new file mode 100644
index 000000000..d40b6100a
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/rpchook/DynamicalExtFieldRPCHook.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rpchook;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class DynamicalExtFieldRPCHook implements RPCHook {
+
+    @Override
+    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+        String zoneName = System.getProperty(MixAll.ROCKETMQ_ZONE_PROPERTY, 
System.getenv(MixAll.ROCKETMQ_ZONE_ENV));
+        if (StringUtils.isNotBlank(zoneName)) {
+            request.addExtField(MixAll.ZONE_NAME, zoneName);
+        }
+        String zoneMode = 
System.getProperty(MixAll.ROCKETMQ_ZONE_MODE_PROPERTY, 
System.getenv(MixAll.ROCKETMQ_ZONE_MODE_ENV));
+        if (StringUtils.isNotBlank(zoneMode)) {
+            request.addExtField(MixAll.ZONE_MODE, zoneMode);
+        }
+    }
+
+    @Override
+    public void doAfterResponse(String remoteAddr, RemotingCommand request, 
RemotingCommand response) {
+        
+    }
+}
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 784d6eb0f..9c96917c1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
 import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
 import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
 import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
+import org.apache.rocketmq.namesrv.route.ZoneRouteRPCHook;
 import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
 import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -198,6 +199,7 @@ public class NamesrvController {
             }
         }
 
+        initialRpcHooks();
         return true;
     }
 
@@ -240,6 +242,10 @@ public class NamesrvController {
         }
     }
 
+    private void initialRpcHooks() {
+        this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
+    }
+    
     public void start() throws Exception {
         this.remotingServer.start();
         this.remotingClient.start();
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 43a738b16..8cc49caa9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -241,7 +241,6 @@ public class NamesrvStartup {
         opt = new Option("p", "printConfigItem", false, "Print all config 
items");
         opt.setRequired(false);
         options.addOption(opt);
-
         return options;
     }
 
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 315bf7ef2..d28b294b5 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -238,6 +238,7 @@ public class DefaultRequestProcessor implements 
NettyRequestProcessor {
             requestHeader.getBrokerName(),
             requestHeader.getBrokerId(),
             requestHeader.getHaServerAddr(),
+            request.getExtFields().get(MixAll.ZONE_NAME),
             requestHeader.getHeartbeatTimeoutMillis(),
             requestHeader.getEnableActingMaster(),
             topicConfigWrapper,
@@ -350,10 +351,9 @@ public class DefaultRequestProcessor implements 
NettyRequestProcessor {
     }
 
     public RemotingCommand unregisterBroker(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
+            RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        final UnRegisterBrokerRequestHeader requestHeader =
-            (UnRegisterBrokerRequestHeader) 
request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
+        final UnRegisterBrokerRequestHeader requestHeader = 
(UnRegisterBrokerRequestHeader) 
request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
 
         if 
(!this.namesrvController.getRouteInfoManager().submitUnRegisterBrokerRequest(requestHeader))
 {
             log.warn("Couldn't submit the unregister broker request to 
handler, broker info: {}", requestHeader);
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
new file mode 100644
index 000000000..98a50df6e
--- /dev/null
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.route;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class ZoneRouteRPCHook implements RPCHook {
+
+    @Override
+    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+
+    }
+
+    @Override
+    public void doAfterResponse(String remoteAddr, RemotingCommand request, 
RemotingCommand response) {
+        if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) {
+            return;
+        }
+        if (response == null || response.getBody() == null || 
ResponseCode.SUCCESS != response.getCode()) {
+            return;
+        }
+        boolean zoneMode = 
Boolean.valueOf(request.getExtFields().get(MixAll.ZONE_MODE));
+        if (!zoneMode) {
+            return;
+        }
+        String zoneName = request.getExtFields().get(MixAll.ZONE_NAME);
+        if (StringUtils.isBlank(zoneName)) {
+            return;
+        }
+        TopicRouteData topicRouteData = 
RemotingSerializable.decode(response.getBody(), TopicRouteData.class);
+
+        response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
+    }
+    
+    private TopicRouteData filterByZoneName(TopicRouteData topicRouteData, 
String zoneName) {
+        List<BrokerData> brokerDataReserved = new ArrayList<>();
+        Map<String, BrokerData> brokerDataRemoved = new HashMap<>();
+        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+            //master down, consume from slave. break nearby route rule.
+            if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null
+                || StringUtils.equalsIgnoreCase(brokerData.getZoneName(), 
zoneName)) {
+                brokerDataReserved.add(brokerData);
+            } else {
+                brokerDataRemoved.put(brokerData.getBrokerName(), brokerData);
+            }
+        }
+        topicRouteData.setBrokerDatas(brokerDataReserved);
+
+        List<QueueData> queueDataReserved = new ArrayList<>();
+        for (QueueData queueData : topicRouteData.getQueueDatas()) {
+            if (!brokerDataRemoved.containsKey(queueData.getBrokerName())) {
+                queueDataReserved.add(queueData);
+            }
+        }
+        topicRouteData.setQueueDatas(queueDataReserved);
+        // remove filter server table by broker address
+        if (topicRouteData.getFilterServerTable() != null && 
!topicRouteData.getFilterServerTable().isEmpty()) {
+            for (Entry<String, BrokerData> entry : 
brokerDataRemoved.entrySet()) {
+                BrokerData brokerData = entry.getValue();
+                if (brokerData.getBrokerAddrs() == null) {
+                    continue;
+                }
+                brokerData.getBrokerAddrs().values()
+                    .stream()
+                    .forEach(brokerAddr -> 
topicRouteData.getFilterServerTable().remove(brokerAddr));
+            }
+        }
+        return topicRouteData;
+    }
+}
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 79b826048..05cff6595 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -208,16 +208,17 @@ public class RouteInfoManager {
     }
 
     public RegisterBrokerResult registerBroker(
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final Long timeoutMillis,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
-        final Channel channel) {
-        return registerBroker(clusterName, brokerAddr, brokerName, brokerId, 
haServerAddr, timeoutMillis, false, topicConfigWrapper, filterServerList, 
channel);
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final String zoneName,
+            final Long timeoutMillis,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final Channel channel) {
+        return registerBroker(clusterName, brokerAddr, brokerName, brokerId, 
haServerAddr, zoneName, timeoutMillis, false, topicConfigWrapper, 
filterServerList, channel);
     }
 
     public RegisterBrokerResult registerBroker(
@@ -226,6 +227,7 @@ public class RouteInfoManager {
         final String brokerName,
         final long brokerId,
         final String haServerAddr,
+        final String zoneName,
         final Long timeoutMillis,
         final Boolean enableActingMaster,
         final TopicConfigSerializeWrapper topicConfigWrapper,
@@ -250,7 +252,8 @@ public class RouteInfoManager {
 
                 boolean isOldVersionBroker = enableActingMaster == null;
                 brokerData.setEnableActingMaster(isOldVersionBroker ? false : 
enableActingMaster);
-
+                brokerData.setZoneName(zoneName);
+                
                 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
 
                 boolean isMinBrokerIdChanged = false;
@@ -688,8 +691,11 @@ public class RouteInfoManager {
                     for (String brokerName : brokerNameSet) {
                         BrokerData brokerData = 
this.brokerAddrTable.get(brokerName);
                         if (null != brokerData) {
-                            BrokerData brokerDataClone = new 
BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, 
String>) brokerData
-                                .getBrokerAddrs().clone(), 
brokerData.isEnableActingMaster());
+                            BrokerData brokerDataClone = new 
BrokerData(brokerData.getCluster(), 
+                                brokerData.getBrokerName(),
+                                (HashMap<Long, String>) 
brokerData.getBrokerAddrs().clone(),
+                                brokerData.isEnableActingMaster(), 
brokerData.getZoneName());
+
                             brokerDataList.add(brokerDataClone);
                             foundBrokerData = true;
                             if (!filterServerTable.isEmpty()) {
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 02c30d55f..512d6b3d6 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -573,8 +573,9 @@ public class RequestProcessorTest {
         }
         
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
         Channel channel = mock(Channel.class);
-        RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", 
"default-broker", 1234, "127.0.0.1:1001",
+        RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", 
"default-broker", 1234, "127.0.0.1:1001", "",
             null, topicConfigSerializeWrapper, new ArrayList<String>(), 
channel);
 
     }
+
 }
\ No newline at end of file
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
index 0ea8b054a..8dfb88e72 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/GetRouteInfoBenchmark.java
@@ -116,7 +116,7 @@ public class GetRouteInfoBenchmark {
                             
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
                             Channel channel = mock(Channel.class);
 
-                            routeInfoManager.registerBroker(clusterName, 
brokerAddr, brokerName, 0, brokerAddr,
+                            routeInfoManager.registerBroker(clusterName, 
brokerAddr, brokerName, 0, brokerAddr, "",
                                 null, topicConfigSerializeWrapper, new 
ArrayList<String>(), channel);
                         }
                     }
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
index f64211582..ba3e4530d 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RegisterBrokerBenchmark.java
@@ -145,6 +145,7 @@ public class RegisterBrokerBenchmark {
         routeInfoManager.registerBroker("DefaultCluster" + index,
             "127.0.0.1:500" + index,
             "DefaultBroker" + index, 0, "127.0.0.1:400" + index,
+            "",
             null,
             topicConfigSerializeWrapper, new ArrayList<String>(), channel);
     }
@@ -167,6 +168,7 @@ public class RegisterBrokerBenchmark {
         routeInfoManager.registerBroker("DefaultCluster" + index,
             "127.0.0.1:500" + index,
             "DefaultBroker" + index, 0, "127.0.0.1:400" + index,
+            "",
             null,
             topicConfigSerializeWrapper, new ArrayList<String>(), channel);
     }
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
index e4b8db78d..9c397ab02 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
@@ -89,7 +89,7 @@ public class RouteInfoManagerTest {
             topicConfigSerializeWrapper.setDataVersion(targetVersion);
             
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
             Channel channel = mock(Channel.class);
-            RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster-1", "127.0.0.1:10911", 
"default-broker-1", 1234, "127.0.0.1:1001",
+            RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster-1", "127.0.0.1:10911", 
"default-broker-1", 1234, "127.0.0.1:1001", "", 
                     null, topicConfigSerializeWrapper, new 
ArrayList<String>(), channel);
             assertThat(registerBrokerResult).isNotNull();
 
@@ -129,7 +129,7 @@ public class RouteInfoManagerTest {
         topicConfigSerializeWrapper.setDataVersion(dataVersion);
         
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
         Channel channel = mock(Channel.class);
-        RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", 
"default-broker", 1234, "127.0.0.1:1001",
+        RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", 
"default-broker", 1234, "127.0.0.1:1001", "", 
                 null, topicConfigSerializeWrapper, new ArrayList<String>(), 
channel);
         assertThat(registerBrokerResult).isNotNull();
     }
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
index e4ddaa3c3..55d730c54 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTestBase.java
@@ -148,6 +148,7 @@ public class RouteInfoManagerTestBase {
                 brokerAddr,
                 brokerName,
                 brokerId,
+                "",
                 haServerAddr,
                 null,
                 topicConfigSerializeWrapper,
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
index c50acfd8e..e098f0ef9 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
@@ -707,6 +707,7 @@ public class RouteInfoManager_NewTest {
             brokerInfo.brokerName,
             brokerInfo.brokerId,
             brokerInfo.haAddr,
+            "",
             null,
             brokerInfo.enableActingMaster,
             topicConfigSerializeWrapper, new ArrayList<String>(), channel);

Reply via email to