This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d9d53d58cf add some tests for nameserver (#8349)
d9d53d58cf is described below
commit d9d53d58cf7f32143485f12d4851d5c119d0855a
Author: Tan Xiang <[email protected]>
AuthorDate: Thu Jul 18 14:22:34 2024 +0800
add some tests for nameserver (#8349)
---
.../rocketmq/namesrv/route/ZoneRouteRPCHook.java | 7 +-
.../namesrv/processor/RequestProcessorTest.java | 36 +++++
.../namesrv/route/ZoneRouteRPCHookTest.java | 164 +++++++++++++++++++++
.../namesrv/routeinfo/RouteInfoManagerNewTest.java | 69 ++++++++-
4 files changed, 269 insertions(+), 7 deletions(-)
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
index 4983c88c8a..a740a0f1b4 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHook.java
@@ -56,7 +56,6 @@ public class ZoneRouteRPCHook implements RPCHook {
return;
}
TopicRouteData topicRouteData =
RemotingSerializable.decode(response.getBody(), TopicRouteData.class);
-
response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
}
@@ -64,6 +63,9 @@ public class ZoneRouteRPCHook implements RPCHook {
List<BrokerData> brokerDataReserved = new ArrayList<>();
Map<String, BrokerData> brokerDataRemoved = new HashMap<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+ if (brokerData.getBrokerAddrs() == null) {
+ continue;
+ }
//master down, consume from slave. break nearby route rule.
if (brokerData.getBrokerAddrs().get(MixAll.MASTER_ID) == null
|| StringUtils.equalsIgnoreCase(brokerData.getZoneName(),
zoneName)) {
@@ -85,9 +87,6 @@ public class ZoneRouteRPCHook implements RPCHook {
if (topicRouteData.getFilterServerTable() != null &&
!topicRouteData.getFilterServerTable().isEmpty()) {
for (Entry<String, BrokerData> entry :
brokerDataRemoved.entrySet()) {
BrokerData brokerData = entry.getValue();
- if (brokerData.getBrokerAddrs() == null) {
- continue;
- }
brokerData.getBrokerAddrs().values()
.forEach(brokerAddr ->
topicRouteData.getFilterServerTable().remove(brokerAddr));
}
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 2b2cf62949..831558a0f6 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -448,6 +448,42 @@ public class RequestProcessorTest {
assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+ @Test
+ public void testQueryDataVersion()throws RemotingCommandException {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+ RemotingCommand request =
getRemotingCommand(RequestCode.QUERY_DATA_VERSION);
+ RemotingCommand remotingCommand =
defaultRequestProcessor.processRequest(ctx, request);
+ assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetBrokerMemberBroker() throws RemotingCommandException {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+ RemotingCommand request =
getRemotingCommand(RequestCode.GET_BROKER_MEMBER_GROUP);
+ RemotingCommand remotingCommand =
defaultRequestProcessor.processRequest(ctx, request);
+ assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testBrokerHeartBeat() throws RemotingCommandException {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+ RemotingCommand request =
getRemotingCommand(RequestCode.BROKER_HEARTBEAT);
+ RemotingCommand remotingCommand =
defaultRequestProcessor.processRequest(ctx, request);
+ assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testAddWritePermOfBroker() throws RemotingCommandException {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+ RemotingCommand request =
getRemotingCommand(RequestCode.ADD_WRITE_PERM_OF_BROKER);
+ RemotingCommand remotingCommand =
defaultRequestProcessor.processRequest(ctx, request);
+ assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
@Test
public void testWipeWritePermOfBroker() throws RemotingCommandException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
new file mode 100644
index 0000000000..1bf4a6c677
--- /dev/null
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/route/ZoneRouteRPCHookTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.route;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class ZoneRouteRPCHookTest {
+
+ private ZoneRouteRPCHook zoneRouteRPCHook;
+
+ @Before
+ public void setup() {
+ zoneRouteRPCHook = new ZoneRouteRPCHook();
+ }
+
+ @Test
+ public void testDoAfterResponseWithNoZoneMode() {
+ RemotingCommand request1 =
RemotingCommand.createRequestCommand(106,null);
+ zoneRouteRPCHook.doAfterResponse("", request1, null);
+
+ HashMap<String, String> extFields = new HashMap<>();
+ extFields.put(MixAll.ZONE_MODE, "false");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(105,null);
+ request.setExtFields(extFields);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+ }
+
+ @Test
+ public void testDoAfterResponseWithNoZoneName() {
+ HashMap<String, String> extFields = new HashMap<>();
+ extFields.put(MixAll.ZONE_MODE, "true");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(105,null);
+ request.setExtFields(extFields);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+ }
+
+ @Test
+ public void testDoAfterResponseWithNoResponse() {
+ HashMap<String, String> extFields = new HashMap<>();
+ extFields.put(MixAll.ZONE_MODE, "true");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(105,null);
+ request.setExtFields(extFields);
+ zoneRouteRPCHook.doAfterResponse("", request, null);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+
+
response.setBody(RemotingSerializable.encode(createSampleTopicRouteData()));
+ response.setCode(ResponseCode.NO_PERMISSION);
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+ }
+
+
+ @Test
+ public void testDoAfterResponseWithValidZoneFiltering() throws Exception {
+ HashMap<String, String> extFields = new HashMap<>();
+ extFields.put(MixAll.ZONE_MODE, "true");
+ extFields.put(MixAll.ZONE_NAME,"zone1");
+ RemotingCommand request =
RemotingCommand.createRequestCommand(105,null);
+ request.setExtFields(extFields);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ TopicRouteData topicRouteData = createSampleTopicRouteData();
+ response.setBody(RemotingSerializable.encode(topicRouteData));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+
+ HashMap<Long,String> brokeraddrs = new HashMap<>();
+ brokeraddrs.put(MixAll.MASTER_ID,"127.0.0.1:10911");
+ topicRouteData.getBrokerDatas().get(0).setBrokerAddrs(brokeraddrs);
+ response.setBody(RemotingSerializable.encode(topicRouteData));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+
+ topicRouteData.getQueueDatas().add(createQueueData("BrokerB"));
+ HashMap<Long,String> brokeraddrsB = new HashMap<>();
+ brokeraddrsB.put(MixAll.MASTER_ID,"127.0.0.1:10912");
+ BrokerData brokerData1 =
createBrokerData("BrokerB","zone2",brokeraddrsB);
+ BrokerData brokerData2 = createBrokerData("BrokerC","zone1",null);
+ topicRouteData.getBrokerDatas().add(brokerData1);
+ topicRouteData.getBrokerDatas().add(brokerData2);
+ response.setBody(RemotingSerializable.encode(topicRouteData));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+
+ topicRouteData.getFilterServerTable().put("127.0.0.1:10911",new
ArrayList<>());
+ response.setBody(RemotingSerializable.encode(topicRouteData));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+ Assert.assertEquals(1,RemotingSerializable
+ .decode(response.getBody(), TopicRouteData.class)
+ .getFilterServerTable()
+ .size());
+
+ topicRouteData.getFilterServerTable().put("127.0.0.1:10912",new
ArrayList<>());
+ response.setBody(RemotingSerializable.encode(topicRouteData));
+ zoneRouteRPCHook.doAfterResponse("", request, response);
+ Assert.assertEquals(1,RemotingSerializable
+ .decode(response.getBody(), TopicRouteData.class)
+ .getFilterServerTable()
+ .size());
+ }
+
+ private TopicRouteData createSampleTopicRouteData() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ List<BrokerData> brokerDatas = new ArrayList<>();
+ BrokerData brokerData = createBrokerData("BrokerA","zone1",new
HashMap<>());
+ List<QueueData> queueDatas = new ArrayList<>();
+ QueueData queueData = createQueueData("BrokerA");
+ queueDatas.add(queueData);
+ brokerDatas.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDatas);
+ topicRouteData.setQueueDatas(queueDatas);
+ return topicRouteData;
+ }
+
+ private BrokerData createBrokerData(String brokerName,String
zoneName,HashMap<Long,String> brokerAddrs) {
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName(brokerName);
+ brokerData.setZoneName(zoneName);
+ brokerData.setBrokerAddrs(brokerAddrs);
+ return brokerData;
+ }
+
+ private QueueData createQueueData(String brokerName) {
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setReadQueueNums(8);
+ queueData.setWriteQueueNums(8);
+ queueData.setPerm(6);
+ return queueData;
+ }
+}
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index b52cf50740..5e58cfc124 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -416,9 +416,12 @@ public class RouteInfoManagerNewTest {
}
@Test
- public void scanNotActiveBroker() {
+ public void scanNotActiveBroker() throws InterruptedException {
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(),
"TestTopic");
routeInfoManager.scanNotActiveBroker();
+
registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo.defaultBroker(),"TestTopic");
+ Thread.sleep(30000);
+ routeInfoManager.scanNotActiveBroker();
}
@Test
@@ -589,6 +592,16 @@ public class RouteInfoManagerNewTest {
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
}
+ @Test
+ public void onChannelDestroyByBrokerInfo() {
+ registerBroker(BrokerBasicInfo.defaultBroker(), mock(Channel.class),
null, "TestTopic", "TestTopic1");
+ BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(DEFAULT_CLUSTER,
DEFAULT_ADDR);
+ routeInfoManager.onChannelDestroy(brokerAddrInfo);
+ await().atMost(Duration.ofSeconds(5)).until(() ->
routeInfoManager.blockedUnRegisterRequests() == 0);
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+ }
+
@Test
public void switchBrokerRole_ChannelDestroy() {
final BrokerBasicInfo masterBroker =
BrokerBasicInfo.defaultBroker().enableActingMaster(false);
@@ -728,6 +741,23 @@ public class RouteInfoManagerNewTest {
return registerBroker(brokerInfo, mock(Channel.class),
topicConfigConcurrentHashMap, topics);
}
+ private RegisterBrokerResult
registerBrokerWithNormalTopicAndExpire(BrokerBasicInfo brokerInfo, String...
topics) {
+ ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
+ for (final String topic : topics) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setWriteQueueNums(8);
+ topicConfig.setTopicName(topic);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setOrder(false);
+ topicConfigConcurrentHashMap.put(topic, topicConfig);
+ }
+
+ return registerBrokerWithExpiredTime(brokerInfo, mock(Channel.class),
topicConfigConcurrentHashMap, topics);
+ }
+
private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo
brokerBasicInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
@@ -785,7 +815,7 @@ public class RouteInfoManagerNewTest {
topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
- RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker(
+ return routeInfoManager.registerBroker(
brokerInfo.clusterName,
brokerInfo.brokerAddr,
brokerInfo.brokerName,
@@ -795,7 +825,40 @@ public class RouteInfoManagerNewTest {
null,
brokerInfo.enableActingMaster,
topicConfigSerializeWrapper, new ArrayList<>(), channel);
- return registerBrokerResult;
+ }
+
+ private RegisterBrokerResult registerBrokerWithExpiredTime(BrokerBasicInfo
brokerInfo, Channel channel,
+
ConcurrentMap<String, TopicConfig> topicConfigConcurrentHashMap, String...
topics) {
+
+ if (topicConfigConcurrentHashMap == null) {
+ topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(),
baseTopic);
+ for (final String topic : topics) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setWriteQueueNums(8);
+ topicConfig.setTopicName(topic);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setOrder(false);
+ topicConfigConcurrentHashMap.put(topic, topicConfig);
+ }
+ }
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setDataVersion(brokerInfo.dataVersion);
+
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
+
+ return routeInfoManager.registerBroker(
+ brokerInfo.clusterName,
+ brokerInfo.brokerAddr,
+ brokerInfo.brokerName,
+ brokerInfo.brokerId,
+ brokerInfo.haAddr,
+ "",
+ 30000L,
+ brokerInfo.enableActingMaster,
+ topicConfigSerializeWrapper, new ArrayList<>(), channel);
}
private void registerSingleTopicWithBrokerName(String brokerName,
String... topics) {