This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9b87205c6 [ISSUE #4522] Fix topic route info not found in some case
(#4523)
9b87205c6 is described below
commit 9b87205c6d44e1c08aa5ae9aee6f2b28a60bb586
Author: cserwen <[email protected]>
AuthorDate: Wed Aug 10 13:42:12 2022 +0800
[ISSUE #4522] Fix topic route info not found in some case (#4523)
* fix topic route info not found in some case
* fix unit test about broker register
Co-authored-by: dengzhiwen1 <[email protected]>
---
.../namesrv/processor/DefaultRequestProcessor.java | 7 +++++
.../namesrv/routeinfo/RouteInfoManager.java | 6 ++++
.../namesrv/processor/RequestProcessorTest.java | 33 ++++++++++++++++------
.../namesrv/routeinfo/RouteInfoManagerTest.java | 18 +++---------
.../routeinfo/RouteInfoManager_NewTest.java | 13 +++++++--
5 files changed, 52 insertions(+), 25 deletions(-)
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 278499227..315bf7ef2 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -245,6 +245,13 @@ public class DefaultRequestProcessor implements
NettyRequestProcessor {
ctx.channel()
);
+ if (result == null) {
+ // Register single topic route info should be after the broker
completes the first registration.
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("register broker failed");
+ return response;
+ }
+
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 91f14a372..79b826048 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -286,6 +286,12 @@ public class RouteInfoManager {
}
}
+ if (!brokerAddrsMap.containsKey(brokerId) &&
topicConfigWrapper.getTopicConfigTable().size() == 1) {
+ log.warn("Can't register topicConfigWrapper={} because
broker[{}]={} has not registered.",
+ topicConfigWrapper.getTopicConfigTable(),
brokerId, brokerAddr);
+ return null;
+ }
+
String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst ||
(StringUtils.isEmpty(oldAddr));
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 8f91d5b24..02c30d55f 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -27,10 +27,13 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
@@ -519,12 +522,23 @@ public class RequestProcessorTest {
request.addExtField("clusterName", "cluster");
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", "2333");
- request.addExtField("topic", "unit-test");
+ request.addExtField("topic", "unit-test0");
return request;
}
private static RemotingCommand genSampleRegisterCmd(boolean reg) {
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
+ byte[] body = null;
+ if (reg) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new
TopicConfigAndMappingSerializeWrapper();
+ topicConfigWrapper.getTopicConfigTable().put("unit-test1", new
TopicConfig());
+ topicConfigWrapper.getTopicConfigTable().put("unit-test2", new
TopicConfig());
+ RegisterBrokerBody requestBody = new RegisterBrokerBody();
+ requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+ body = requestBody.encode(false);
+ final int bodyCrc32 = UtilAll.crc32(body);
+ header.setBodyCrc32(bodyCrc32);
+ }
header.setBrokerName("broker");
RemotingCommand request = RemotingCommand.createRequestCommand(
reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER,
header);
@@ -533,6 +547,7 @@ public class RequestProcessorTest {
request.addExtField("clusterName", "cluster");
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", "2333");
+ request.setBody(body);
return request;
}
@@ -547,13 +562,15 @@ public class RequestProcessorTest {
private void registerRouteInfoManager() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setWriteQueueNums(8);
- topicConfig.setTopicName("unit-test");
- topicConfig.setPerm(6);
- topicConfig.setReadQueueNums(8);
- topicConfig.setOrder(false);
- topicConfigConcurrentHashMap.put("unit-test", topicConfig);
+ for (int i = 0; i < 2; i++) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setWriteQueueNums(8);
+ topicConfig.setTopicName("unit-test" + i);
+ topicConfig.setPerm(6);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setOrder(false);
+ topicConfigConcurrentHashMap.put(topicConfig.getTopicName(),
topicConfig);
+ }
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
RegisterBrokerResult registerBrokerResult =
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911",
"default-broker", 1234, "127.0.0.1:1001",
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
index fdd321a83..e4b8db78d 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
@@ -82,13 +82,8 @@ public class RouteInfoManagerTest {
targetVersion.setTimestamp(200L);
ConcurrentHashMap<String, TopicConfig>
topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setWriteQueueNums(8);
- topicConfig.setTopicName("unit-test");
- topicConfig.setPerm(6);
- topicConfig.setReadQueueNums(8);
- topicConfig.setOrder(false);
- topicConfigConcurrentHashMap.put("unit-test-1", topicConfig);
+ topicConfigConcurrentHashMap.put("unit-test-0", new
TopicConfig("unit-test-0"));
+ topicConfigConcurrentHashMap.put("unit-test-1", new
TopicConfig("unit-test-1"));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(targetVersion);
@@ -127,13 +122,8 @@ public class RouteInfoManagerTest {
dataVersion.setTimestamp(100L);
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setWriteQueueNums(8);
- topicConfig.setTopicName("unit-test");
- topicConfig.setPerm(6);
- topicConfig.setReadQueueNums(8);
- topicConfig.setOrder(false);
- topicConfigConcurrentHashMap.put("unit-test", topicConfig);
+ topicConfigConcurrentHashMap.put("unit-test0", new
TopicConfig("unit-test0"));
+ topicConfigConcurrentHashMap.put("unit-test1", new
TopicConfig("unit-test1"));
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
index b15ff542a..c50acfd8e 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
@@ -629,7 +629,8 @@ public class RouteInfoManager_NewTest {
private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo
brokerInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
-
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
@@ -646,6 +647,9 @@ public class RouteInfoManager_NewTest {
private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo
brokerBasicInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+ baseTopic.setOrder(true);
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
@@ -660,7 +664,9 @@ public class RouteInfoManager_NewTest {
private RegisterBrokerResult
registerBrokerWithGlobalOrderTopic(BrokerBasicInfo brokerBasicInfo, String...
topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
-
+ TopicConfig baseTopic = new TopicConfig("baseTopic", 1, 1);
+ baseTopic.setOrder(true);
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(1);
@@ -678,7 +684,8 @@ public class RouteInfoManager_NewTest {
if (topicConfigConcurrentHashMap == null) {
topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
-
+ TopicConfig baseTopic = new TopicConfig("baseTopic");
+ topicConfigConcurrentHashMap.put(baseTopic.getTopicName(),
baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);