This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ff60d5c24 [ISSUE #5322] improving SDK topic route availability (#5323)
ff60d5c24 is described below

commit ff60d5c24a0fc83dc19e329d647136e98fc03bd0
Author: fuyou001 <[email protected]>
AuthorDate: Mon Oct 17 14:53:07 2022 +0800

    [ISSUE #5322] improving SDK topic route availability (#5323)
    
    * [ISSUE #5322] improving SDK topic route availability
    
    * [ISSUE #5322] refactor
    
    * [ISSUE #5322] refactor
---
 .../rocketmq/common/namesrv/NamesrvConfig.java     |  19 ++++
 .../namesrv/processor/ClientRequestProcessor.java  |  30 ++++++
 .../processor/ClusterTestRequestProcessorTest.java | 106 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 4724e1c05..700febfe2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -78,6 +78,9 @@ public class NamesrvConfig {
      */
     private boolean enableControllerInNamesrv = false;
 
+    private volatile boolean needWaitForService = false;
+
+    private int waitSecondsForService = 45;
 
     public boolean isOrderMessageEnable() {
         return orderMessageEnable;
@@ -222,4 +225,20 @@ public class NamesrvConfig {
     public void setEnableControllerInNamesrv(boolean 
enableControllerInNamesrv) {
         this.enableControllerInNamesrv = enableControllerInNamesrv;
     }
+
+    public boolean isNeedWaitForService() {
+        return needWaitForService;
+    }
+
+    public void setNeedWaitForService(boolean needWaitForService) {
+        this.needWaitForService = needWaitForService;
+    }
+
+    public int getWaitSecondsForService() {
+        return waitSecondsForService;
+    }
+
+    public void setWaitSecondsForService(int waitSecondsForService) {
+        this.waitSecondsForService = waitSecondsForService;
+    }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
index 3642d5f59..54eef8807 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
@@ -19,22 +19,35 @@ package org.apache.rocketmq.namesrv.processor;
 
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ClientRequestProcessor implements NettyRequestProcessor {
+
+    private static InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
     protected NamesrvController namesrvController;
+    private long startupTimeMillis;
+
+    private  AtomicBoolean needCheckNamesrvReady = new AtomicBoolean(true);
 
     public ClientRequestProcessor(final NamesrvController namesrvController) {
         this.namesrvController = namesrvController;
+        this.startupTimeMillis = System.currentTimeMillis();
     }
 
     @Override
@@ -49,9 +62,26 @@ public class ClientRequestProcessor implements 
NettyRequestProcessor {
         final GetRouteInfoRequestHeader requestHeader =
             (GetRouteInfoRequestHeader) 
request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
 
+        boolean namesrvReady = needCheckNamesrvReady.get()  && 
System.currentTimeMillis() - startupTimeMillis >= 
TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());
+
+        if (namesrvController.getNamesrvConfig().isNeedWaitForService() && 
!namesrvReady) {
+            //protect  logic
+            if (request.getCode() != RequestCode.REGISTER_BROKER && 
request.getCode() != RequestCode.UNREGISTER_BROKER) {
+                log.warn("name server not ready. request code {} ", 
request.getCode());
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("name server not ready");
+                return response;
+            }
+        }
+
         TopicRouteData topicRouteData = 
this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 
         if (topicRouteData != null) {
+            //topic route info register success ,so disable namesrvReady check
+            if (needCheckNamesrvReady.get()) {
+                needCheckNamesrvReady.set(false);
+            }
+
             if 
(this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                 String orderTopicConf =
                     
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
index 0ed452d3a..e1bac3077 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
@@ -22,16 +22,20 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -44,6 +48,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -52,16 +57,17 @@ import static org.mockito.Mockito.when;
 public class ClusterTestRequestProcessorTest {
     private ClusterTestRequestProcessor clusterTestProcessor;
     private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
-    private MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+    private MQClientInstance mqClientInstance = MQClientManager.getInstance()
+            .getOrCreateMQClientInstance(new ClientConfig());
     private MQClientAPIImpl mQClientAPIImpl;
     private ChannelHandlerContext ctx;
 
     @Before
-    public void init() throws NoSuchFieldException, IllegalAccessException, 
RemotingException, MQClientException, InterruptedException {
+    public void init() throws NoSuchFieldException, IllegalAccessException, 
RemotingException, MQClientException,
+            InterruptedException {
         NamesrvController namesrvController = new NamesrvController(
-            new NamesrvConfig(),
-            new NettyServerConfig()
-        );
+                new NamesrvConfig(),
+                new NettyServerConfig());
 
         clusterTestProcessor = new 
ClusterTestRequestProcessor(namesrvController, "default-producer");
         mQClientAPIImpl = mock(MQClientAPIImpl.class);
@@ -110,4 +116,94 @@ public class ClusterTestRequestProcessorTest {
         assertThat(remoting.getRemark()).isNotNull();
     }
 
+    @Test
+    public void testNamesrvReady() throws Exception {
+        String topicName = "rocketmq-topic-test-ready";
+        RouteInfoManager routeInfoManager = mockRouteInfoManager();
+        NamesrvController namesrvController = 
mockNamesrvController(routeInfoManager, true, -1,true);
+        ClientRequestProcessor clientRequestProcessor = new 
ClientRequestProcessor(namesrvController);
+        GetRouteInfoRequestHeader routeInfoRequestHeader = 
mockRouteInfoRequestHeader(topicName);
+        RemotingCommand remotingCommand = 
mockTopicRouteCommand(routeInfoRequestHeader);
+        RemotingCommand response = 
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+                remotingCommand);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testNamesrvNoNeedWaitForService() throws Exception {
+        String topicName = "rocketmq-topic-test-ready";
+        RouteInfoManager routeInfoManager = mockRouteInfoManager();
+        NamesrvController namesrvController = 
mockNamesrvController(routeInfoManager, true, 45,false);
+        ClientRequestProcessor clientRequestProcessor = new 
ClientRequestProcessor(namesrvController);
+        GetRouteInfoRequestHeader routeInfoRequestHeader = 
mockRouteInfoRequestHeader(topicName);
+        RemotingCommand remotingCommand = 
mockTopicRouteCommand(routeInfoRequestHeader);
+        RemotingCommand response = 
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+            remotingCommand);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testNamesrvNotReady() throws Exception {
+        String topicName = "rocketmq-topic-test";
+        RouteInfoManager routeInfoManager = mockRouteInfoManager();
+        NamesrvController namesrvController = 
mockNamesrvController(routeInfoManager, false, 45,true);
+        GetRouteInfoRequestHeader routeInfoRequestHeader = 
mockRouteInfoRequestHeader(topicName);
+        RemotingCommand remotingCommand = 
mockTopicRouteCommand(routeInfoRequestHeader);
+        ClientRequestProcessor clientRequestProcessor = new 
ClientRequestProcessor(namesrvController);
+        RemotingCommand response = 
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+                remotingCommand);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+    }
+
+    @Test
+    public void testNamesrv() throws Exception {
+        int waitSecondsForService = 3;
+        String topicName = "rocketmq-topic-test";
+        RouteInfoManager routeInfoManager = mockRouteInfoManager();
+        NamesrvController namesrvController = 
mockNamesrvController(routeInfoManager, false, waitSecondsForService,true);
+        GetRouteInfoRequestHeader routeInfoRequestHeader = 
mockRouteInfoRequestHeader(topicName);
+        RemotingCommand remotingCommand = 
mockTopicRouteCommand(routeInfoRequestHeader);
+        ClientRequestProcessor clientRequestProcessor = new 
ClientRequestProcessor(namesrvController);
+        RemotingCommand response = 
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+                remotingCommand);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        TimeUnit.SECONDS.sleep(waitSecondsForService + 1);
+        response = 
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class), 
remotingCommand);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    private RemotingCommand mockTopicRouteCommand(
+            GetRouteInfoRequestHeader routeInfoRequestHeader) throws 
RemotingCommandException {
+        RemotingCommand remotingCommand = mock(RemotingCommand.class);
+        
when(remotingCommand.decodeCommandCustomHeader(any())).thenReturn(routeInfoRequestHeader);
+        
when(remotingCommand.getCode()).thenReturn(RequestCode.GET_ROUTEINFO_BY_TOPIC);
+        return remotingCommand;
+    }
+
+    public NamesrvController mockNamesrvController(RouteInfoManager 
routeInfoManager, boolean ready,
+            int waitSecondsForService,boolean needWaitForService) {
+        NamesrvConfig namesrvConfig = mock(NamesrvConfig.class);
+        
when(namesrvConfig.isNeedWaitForService()).thenReturn(needWaitForService);
+        when(namesrvConfig.getUnRegisterBrokerQueueCapacity()).thenReturn(10);
+        when(namesrvConfig.getWaitSecondsForService()).thenReturn(ready ? 0 : 
waitSecondsForService);
+        NamesrvController namesrvController = mock(NamesrvController.class);
+        when(namesrvController.getNamesrvConfig()).thenReturn(namesrvConfig);
+        
when(namesrvController.getRouteInfoManager()).thenReturn(routeInfoManager);
+
+        return namesrvController;
+    }
+
+    public RouteInfoManager mockRouteInfoManager() {
+        RouteInfoManager routeInfoManager = mock(RouteInfoManager.class);
+        TopicRouteData topicRouteData = mock(TopicRouteData.class);
+        
when(routeInfoManager.pickupTopicRouteData(any())).thenReturn(topicRouteData);
+        return routeInfoManager;
+    }
+
+    public GetRouteInfoRequestHeader mockRouteInfoRequestHeader(String 
topicName) {
+        GetRouteInfoRequestHeader routeInfoRequestHeader = 
mock(GetRouteInfoRequestHeader.class);
+        when(routeInfoRequestHeader.getTopic()).thenReturn(topicName);
+        return routeInfoRequestHeader;
+    }
+
 }
\ No newline at end of file

Reply via email to