This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ff60d5c24 [ISSUE #5322] improving SDK topic route availability (#5323)
ff60d5c24 is described below
commit ff60d5c24a0fc83dc19e329d647136e98fc03bd0
Author: fuyou001 <[email protected]>
AuthorDate: Mon Oct 17 14:53:07 2022 +0800
[ISSUE #5322] improving SDK topic route availability (#5323)
* [ISSUE #5322] improving SDK topic route availability
* [ISSUE #5322] refactor
* [ISSUE #5322] refactor
---
.../rocketmq/common/namesrv/NamesrvConfig.java | 19 ++++
.../namesrv/processor/ClientRequestProcessor.java | 30 ++++++
.../processor/ClusterTestRequestProcessorTest.java | 106 ++++++++++++++++++++-
3 files changed, 150 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 4724e1c05..700febfe2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -78,6 +78,9 @@ public class NamesrvConfig {
*/
private boolean enableControllerInNamesrv = false;
+ private volatile boolean needWaitForService = false;
+
+ private int waitSecondsForService = 45;
public boolean isOrderMessageEnable() {
return orderMessageEnable;
@@ -222,4 +225,20 @@ public class NamesrvConfig {
public void setEnableControllerInNamesrv(boolean
enableControllerInNamesrv) {
this.enableControllerInNamesrv = enableControllerInNamesrv;
}
+
+ public boolean isNeedWaitForService() {
+ return needWaitForService;
+ }
+
+ public void setNeedWaitForService(boolean needWaitForService) {
+ this.needWaitForService = needWaitForService;
+ }
+
+ public int getWaitSecondsForService() {
+ return waitSecondsForService;
+ }
+
+ public void setWaitSecondsForService(int waitSecondsForService) {
+ this.waitSecondsForService = waitSecondsForService;
+ }
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
index 3642d5f59..54eef8807 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClientRequestProcessor.java
@@ -19,22 +19,35 @@ package org.apache.rocketmq.namesrv.processor;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ClientRequestProcessor implements NettyRequestProcessor {
+
+ private static InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
protected NamesrvController namesrvController;
+ private long startupTimeMillis;
+
+ private AtomicBoolean needCheckNamesrvReady = new AtomicBoolean(true);
public ClientRequestProcessor(final NamesrvController namesrvController) {
this.namesrvController = namesrvController;
+ this.startupTimeMillis = System.currentTimeMillis();
}
@Override
@@ -49,9 +62,26 @@ public class ClientRequestProcessor implements
NettyRequestProcessor {
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader)
request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+ boolean namesrvReady = needCheckNamesrvReady.get() &&
System.currentTimeMillis() - startupTimeMillis >=
TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());
+
+ if (namesrvController.getNamesrvConfig().isNeedWaitForService() &&
!namesrvReady) {
+ //protect logic
+ if (request.getCode() != RequestCode.REGISTER_BROKER &&
request.getCode() != RequestCode.UNREGISTER_BROKER) {
+ log.warn("name server not ready. request code {} ",
request.getCode());
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("name server not ready");
+ return response;
+ }
+ }
+
TopicRouteData topicRouteData =
this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
+ //topic route info register success ,so disable namesrvReady check
+ if (needCheckNamesrvReady.get()) {
+ needCheckNamesrvReady.set(false);
+ }
+
if
(this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
index 0ed452d3a..e1bac3077 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java
@@ -22,16 +22,20 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -44,6 +48,7 @@ import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
@@ -52,16 +57,17 @@ import static org.mockito.Mockito.when;
public class ClusterTestRequestProcessorTest {
private ClusterTestRequestProcessor clusterTestProcessor;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private MQClientInstance mqClientInstance =
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance()
+ .getOrCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
private ChannelHandlerContext ctx;
@Before
- public void init() throws NoSuchFieldException, IllegalAccessException,
RemotingException, MQClientException, InterruptedException {
+ public void init() throws NoSuchFieldException, IllegalAccessException,
RemotingException, MQClientException,
+ InterruptedException {
NamesrvController namesrvController = new NamesrvController(
- new NamesrvConfig(),
- new NettyServerConfig()
- );
+ new NamesrvConfig(),
+ new NettyServerConfig());
clusterTestProcessor = new
ClusterTestRequestProcessor(namesrvController, "default-producer");
mQClientAPIImpl = mock(MQClientAPIImpl.class);
@@ -110,4 +116,94 @@ public class ClusterTestRequestProcessorTest {
assertThat(remoting.getRemark()).isNotNull();
}
+ @Test
+ public void testNamesrvReady() throws Exception {
+ String topicName = "rocketmq-topic-test-ready";
+ RouteInfoManager routeInfoManager = mockRouteInfoManager();
+ NamesrvController namesrvController =
mockNamesrvController(routeInfoManager, true, -1,true);
+ ClientRequestProcessor clientRequestProcessor = new
ClientRequestProcessor(namesrvController);
+ GetRouteInfoRequestHeader routeInfoRequestHeader =
mockRouteInfoRequestHeader(topicName);
+ RemotingCommand remotingCommand =
mockTopicRouteCommand(routeInfoRequestHeader);
+ RemotingCommand response =
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+ remotingCommand);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testNamesrvNoNeedWaitForService() throws Exception {
+ String topicName = "rocketmq-topic-test-ready";
+ RouteInfoManager routeInfoManager = mockRouteInfoManager();
+ NamesrvController namesrvController =
mockNamesrvController(routeInfoManager, true, 45,false);
+ ClientRequestProcessor clientRequestProcessor = new
ClientRequestProcessor(namesrvController);
+ GetRouteInfoRequestHeader routeInfoRequestHeader =
mockRouteInfoRequestHeader(topicName);
+ RemotingCommand remotingCommand =
mockTopicRouteCommand(routeInfoRequestHeader);
+ RemotingCommand response =
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+ remotingCommand);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testNamesrvNotReady() throws Exception {
+ String topicName = "rocketmq-topic-test";
+ RouteInfoManager routeInfoManager = mockRouteInfoManager();
+ NamesrvController namesrvController =
mockNamesrvController(routeInfoManager, false, 45,true);
+ GetRouteInfoRequestHeader routeInfoRequestHeader =
mockRouteInfoRequestHeader(topicName);
+ RemotingCommand remotingCommand =
mockTopicRouteCommand(routeInfoRequestHeader);
+ ClientRequestProcessor clientRequestProcessor = new
ClientRequestProcessor(namesrvController);
+ RemotingCommand response =
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+ remotingCommand);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ }
+
+ @Test
+ public void testNamesrv() throws Exception {
+ int waitSecondsForService = 3;
+ String topicName = "rocketmq-topic-test";
+ RouteInfoManager routeInfoManager = mockRouteInfoManager();
+ NamesrvController namesrvController =
mockNamesrvController(routeInfoManager, false, waitSecondsForService,true);
+ GetRouteInfoRequestHeader routeInfoRequestHeader =
mockRouteInfoRequestHeader(topicName);
+ RemotingCommand remotingCommand =
mockTopicRouteCommand(routeInfoRequestHeader);
+ ClientRequestProcessor clientRequestProcessor = new
ClientRequestProcessor(namesrvController);
+ RemotingCommand response =
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
+ remotingCommand);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ TimeUnit.SECONDS.sleep(waitSecondsForService + 1);
+ response =
clientRequestProcessor.processRequest(mock(ChannelHandlerContext.class),
remotingCommand);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ private RemotingCommand mockTopicRouteCommand(
+ GetRouteInfoRequestHeader routeInfoRequestHeader) throws
RemotingCommandException {
+ RemotingCommand remotingCommand = mock(RemotingCommand.class);
+
when(remotingCommand.decodeCommandCustomHeader(any())).thenReturn(routeInfoRequestHeader);
+
when(remotingCommand.getCode()).thenReturn(RequestCode.GET_ROUTEINFO_BY_TOPIC);
+ return remotingCommand;
+ }
+
+ public NamesrvController mockNamesrvController(RouteInfoManager
routeInfoManager, boolean ready,
+ int waitSecondsForService,boolean needWaitForService) {
+ NamesrvConfig namesrvConfig = mock(NamesrvConfig.class);
+
when(namesrvConfig.isNeedWaitForService()).thenReturn(needWaitForService);
+ when(namesrvConfig.getUnRegisterBrokerQueueCapacity()).thenReturn(10);
+ when(namesrvConfig.getWaitSecondsForService()).thenReturn(ready ? 0 :
waitSecondsForService);
+ NamesrvController namesrvController = mock(NamesrvController.class);
+ when(namesrvController.getNamesrvConfig()).thenReturn(namesrvConfig);
+
when(namesrvController.getRouteInfoManager()).thenReturn(routeInfoManager);
+
+ return namesrvController;
+ }
+
+ public RouteInfoManager mockRouteInfoManager() {
+ RouteInfoManager routeInfoManager = mock(RouteInfoManager.class);
+ TopicRouteData topicRouteData = mock(TopicRouteData.class);
+
when(routeInfoManager.pickupTopicRouteData(any())).thenReturn(topicRouteData);
+ return routeInfoManager;
+ }
+
+ public GetRouteInfoRequestHeader mockRouteInfoRequestHeader(String
topicName) {
+ GetRouteInfoRequestHeader routeInfoRequestHeader =
mock(GetRouteInfoRequestHeader.class);
+ when(routeInfoRequestHeader.getTopic()).thenReturn(topicName);
+ return routeInfoRequestHeader;
+ }
+
}
\ No newline at end of file