This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9b87205c6 [ISSUE #4522] Fix topic route info not found in some case 
(#4523)
9b87205c6 is described below

commit 9b87205c6d44e1c08aa5ae9aee6f2b28a60bb586
Author: cserwen <[email protected]>
AuthorDate: Wed Aug 10 13:42:12 2022 +0800

    [ISSUE #4522] Fix topic route info not found in some case (#4523)
    
    * fix topic route info not found in some case
    
    * fix unit test about broker register
    
    Co-authored-by: dengzhiwen1 <[email protected]>
---
 .../namesrv/processor/DefaultRequestProcessor.java |  7 +++++
 .../namesrv/routeinfo/RouteInfoManager.java        |  6 ++++
 .../namesrv/processor/RequestProcessorTest.java    | 33 ++++++++++++++++------
 .../namesrv/routeinfo/RouteInfoManagerTest.java    | 18 +++---------
 .../routeinfo/RouteInfoManager_NewTest.java        | 13 +++++++--
 5 files changed, 52 insertions(+), 25 deletions(-)

diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 278499227..315bf7ef2 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -245,6 +245,13 @@ public class DefaultRequestProcessor implements 
NettyRequestProcessor {
             ctx.channel()
         );
 
+        if (result == null) {
+            // Register single topic route info should be after the broker 
completes the first registration.
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("register broker failed");
+            return response;
+        }
+
         responseHeader.setHaServerAddr(result.getHaServerAddr());
         responseHeader.setMasterAddr(result.getMasterAddr());
 
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 91f14a372..79b826048 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -286,6 +286,12 @@ public class RouteInfoManager {
                     }
                 }
 
+                if (!brokerAddrsMap.containsKey(brokerId) && 
topicConfigWrapper.getTopicConfigTable().size() == 1) {
+                    log.warn("Can't register topicConfigWrapper={} because 
broker[{}]={} has not registered.",
+                            topicConfigWrapper.getTopicConfigTable(), 
brokerId, brokerAddr);
+                    return null;
+                }
+
                 String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
                 registerFirst = registerFirst || 
(StringUtils.isEmpty(oldAddr));
 
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
index 8f91d5b24..02c30d55f 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/RequestProcessorTest.java
@@ -27,10 +27,13 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import 
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
@@ -519,12 +522,23 @@ public class RequestProcessorTest {
         request.addExtField("clusterName", "cluster");
         request.addExtField("haServerAddr", "10.10.2.1");
         request.addExtField("brokerId", "2333");
-        request.addExtField("topic", "unit-test");
+        request.addExtField("topic", "unit-test0");
         return request;
     }
 
     private static RemotingCommand genSampleRegisterCmd(boolean reg) {
         RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
+        byte[] body = null;
+        if (reg) {
+            TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new 
TopicConfigAndMappingSerializeWrapper();
+            topicConfigWrapper.getTopicConfigTable().put("unit-test1", new 
TopicConfig());
+            topicConfigWrapper.getTopicConfigTable().put("unit-test2", new 
TopicConfig());
+            RegisterBrokerBody requestBody = new RegisterBrokerBody();
+            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+            body = requestBody.encode(false);
+            final int bodyCrc32 = UtilAll.crc32(body);
+            header.setBodyCrc32(bodyCrc32);
+        }
         header.setBrokerName("broker");
         RemotingCommand request = RemotingCommand.createRequestCommand(
             reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, 
header);
@@ -533,6 +547,7 @@ public class RequestProcessorTest {
         request.addExtField("clusterName", "cluster");
         request.addExtField("haServerAddr", "10.10.2.1");
         request.addExtField("brokerId", "2333");
+        request.setBody(body);
         return request;
     }
 
@@ -547,13 +562,15 @@ public class RequestProcessorTest {
     private void registerRouteInfoManager() {
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setWriteQueueNums(8);
-        topicConfig.setTopicName("unit-test");
-        topicConfig.setPerm(6);
-        topicConfig.setReadQueueNums(8);
-        topicConfig.setOrder(false);
-        topicConfigConcurrentHashMap.put("unit-test", topicConfig);
+        for (int i = 0; i < 2; i++) {
+            TopicConfig topicConfig = new TopicConfig();
+            topicConfig.setWriteQueueNums(8);
+            topicConfig.setTopicName("unit-test" + i);
+            topicConfig.setPerm(6);
+            topicConfig.setReadQueueNums(8);
+            topicConfig.setOrder(false);
+            topicConfigConcurrentHashMap.put(topicConfig.getTopicName(), 
topicConfig);
+        }
         
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
         Channel channel = mock(Channel.class);
         RegisterBrokerResult registerBrokerResult = 
routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", 
"default-broker", 1234, "127.0.0.1:1001",
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
index fdd321a83..e4b8db78d 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
@@ -82,13 +82,8 @@ public class RouteInfoManagerTest {
             targetVersion.setTimestamp(200L);
 
             ConcurrentHashMap<String, TopicConfig> 
topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
-            TopicConfig topicConfig = new TopicConfig();
-            topicConfig.setWriteQueueNums(8);
-            topicConfig.setTopicName("unit-test");
-            topicConfig.setPerm(6);
-            topicConfig.setReadQueueNums(8);
-            topicConfig.setOrder(false);
-            topicConfigConcurrentHashMap.put("unit-test-1", topicConfig);
+            topicConfigConcurrentHashMap.put("unit-test-0", new 
TopicConfig("unit-test-0"));
+            topicConfigConcurrentHashMap.put("unit-test-1", new 
TopicConfig("unit-test-1"));
 
             TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
             topicConfigSerializeWrapper.setDataVersion(targetVersion);
@@ -127,13 +122,8 @@ public class RouteInfoManagerTest {
         dataVersion.setTimestamp(100L);
 
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setWriteQueueNums(8);
-        topicConfig.setTopicName("unit-test");
-        topicConfig.setPerm(6);
-        topicConfig.setReadQueueNums(8);
-        topicConfig.setOrder(false);
-        topicConfigConcurrentHashMap.put("unit-test", topicConfig);
+        topicConfigConcurrentHashMap.put("unit-test0", new 
TopicConfig("unit-test0"));
+        topicConfigConcurrentHashMap.put("unit-test1", new 
TopicConfig("unit-test1"));
 
         TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
         topicConfigSerializeWrapper.setDataVersion(dataVersion);
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
index b15ff542a..c50acfd8e 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager_NewTest.java
@@ -629,7 +629,8 @@ public class RouteInfoManager_NewTest {
 
     private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo 
brokerInfo, String... topics) {
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
-
+        TopicConfig baseTopic = new TopicConfig("baseTopic");
+        topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
         for (final String topic : topics) {
             TopicConfig topicConfig = new TopicConfig();
             topicConfig.setWriteQueueNums(8);
@@ -646,6 +647,9 @@ public class RouteInfoManager_NewTest {
     private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo 
brokerBasicInfo, String... topics) {
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
 
+        TopicConfig baseTopic = new TopicConfig("baseTopic");
+        baseTopic.setOrder(true);
+        topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
         for (final String topic : topics) {
             TopicConfig topicConfig = new TopicConfig();
             topicConfig.setWriteQueueNums(8);
@@ -660,7 +664,9 @@ public class RouteInfoManager_NewTest {
 
     private RegisterBrokerResult 
registerBrokerWithGlobalOrderTopic(BrokerBasicInfo brokerBasicInfo, String... 
topics) {
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
-
+        TopicConfig baseTopic = new TopicConfig("baseTopic", 1, 1);
+        baseTopic.setOrder(true);
+        topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
         for (final String topic : topics) {
             TopicConfig topicConfig = new TopicConfig();
             topicConfig.setWriteQueueNums(1);
@@ -678,7 +684,8 @@ public class RouteInfoManager_NewTest {
 
         if (topicConfigConcurrentHashMap == null) {
             topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
-
+            TopicConfig baseTopic = new TopicConfig("baseTopic");
+            topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), 
baseTopic);
             for (final String topic : topics) {
                 TopicConfig topicConfig = new TopicConfig();
                 topicConfig.setWriteQueueNums(8);

Reply via email to