This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 14156b4f0a [ISSUE #9191] Provide the ability to replace the remoting
layer implementation for Proxy and Broker (#9192)
14156b4f0a is described below
commit 14156b4f0aacfa9d2a11a69dc4d58686a666d07c
Author: Quan <[email protected]>
AuthorDate: Mon Feb 24 12:27:30 2025 +0800
[ISSUE #9191] Provide the ability to replace the remoting layer
implementation for Proxy and Broker (#9192)
* remoting replacement for proxy and broker
* add unit test
* fix code style
---
.../apache/rocketmq/broker/BrokerController.java | 202 ++++++++++++---------
.../rocketmq/broker/BrokerControllerTest.java | 46 +++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 35 +++-
.../client/impl/mqclient/MQClientAPIExt.java | 14 +-
.../client/impl/mqclient/MQClientAPIFactory.java | 35 +++-
.../rocketmq/client/impl/MQClientAPIImplTest.java | 44 +++++
.../org/apache/rocketmq/common/ObjectCreator.java | 23 +--
.../rocketmq/container/InnerBrokerController.java | 12 +-
.../proxy/service/ClusterServiceManager.java | 19 +-
.../proxy/service/ServiceManagerFactory.java | 10 +-
.../remoting/netty/NettyRemotingAbstract.java | 4 +
.../remoting/netty/NettyRemotingClient.java | 24 +--
.../remoting/netty/NettyRemotingServer.java | 88 +++++----
13 files changed, 373 insertions(+), 183 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 006695c6bc..4031dce8d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -244,10 +244,10 @@ public class BrokerController {
protected final List<SendMessageHook> sendMessageHookList = new
ArrayList<>();
protected final List<ConsumeMessageHook> consumeMessageHookList = new
ArrayList<>();
protected MessageStore messageStore;
- protected RemotingServer remotingServer;
+ protected static final String TCP_REMOTING_SERVER = "TCP_REMOTING_SERVER";
+ protected static final String FAST_REMOTING_SERVER =
"FAST_REMOTING_SERVER";
+ protected final Map<String, RemotingServer> remotingServerMap = new
ConcurrentHashMap<>();
protected CountDownLatch remotingServerStartLatch;
- protected RemotingServer fastRemotingServer;
-
/**
* If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries
are stored in RocksDB.
*/
@@ -494,7 +494,7 @@ public class BrokerController {
}
protected void initializeRemotingServer() throws
CloneNotSupportedException {
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.clientHousekeepingService);
+ RemotingServer tcpRemotingServer = new
NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig)
this.nettyServerConfig.clone();
int listeningPort = nettyServerConfig.getListenPort() - 2;
@@ -503,7 +503,10 @@ public class BrokerController {
}
fastConfig.setListenPort(listeningPort);
- this.fastRemotingServer = new NettyRemotingServer(fastConfig,
this.clientHousekeepingService);
+ RemotingServer fastRemotingServer = new
NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+
+ remotingServerMap.put(TCP_REMOTING_SERVER, tcpRemotingServer);
+ remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer);
}
/**
@@ -939,8 +942,12 @@ public class BrokerController {
}
private void reloadServerSslContext() {
- ((NettyRemotingServer)
remotingServer).loadSslContext();
- ((NettyRemotingServer)
fastRemotingServer).loadSslContext();
+ for (Map.Entry<String, RemotingServer> entry :
remotingServerMap.entrySet()) {
+ RemotingServer remotingServer =
entry.getValue();
+ if (remotingServer instanceof
NettyRemotingServer) {
+ ((NettyRemotingServer)
remotingServer).loadSslContext();
+ }
+ }
}
});
} catch (Exception e) {
@@ -1092,59 +1099,62 @@ public class BrokerController {
}
public void registerProcessor() {
+ RemotingServer remotingServer =
remotingServerMap.get(TCP_REMOTING_SERVER);
+ RemotingServer fastRemotingServer =
remotingServerMap.get(FAST_REMOTING_SERVER);
+
/*
* SendMessageProcessor
*/
sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
- this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendMessageProcessor, this.sendMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,
sendMessageProcessor, this.sendMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE,
recallMessageProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendMessageProcessor, this.sendMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,
sendMessageProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE,
recallMessageProcessor, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendMessageProcessor, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,
sendMessageProcessor, this.sendMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.RECALL_MESSAGE,
recallMessageProcessor, this.sendMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendMessageProcessor, this.sendMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,
sendMessageProcessor, this.sendMessageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,
sendMessageProcessor, this.sendMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.RECALL_MESSAGE,
recallMessageProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,
this.pullMessageProcessor, this.pullMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE,
this.pullMessageProcessor, this.litePullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,
this.pullMessageProcessor, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE,
this.pullMessageProcessor, this.litePullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* PeekMessageProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE,
this.peekMessageProcessor, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE,
this.peekMessageProcessor, this.pullMessageExecutor);
/**
* PopMessageProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE,
this.popMessageProcessor, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.POP_MESSAGE,
this.popMessageProcessor, this.pullMessageExecutor);
/**
* AckMessageProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
/**
* ChangeInvisibleTimeProcessor
*/
-
this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
+
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
/**
* notificationProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.NOTIFICATION,
this.notificationProcessor, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.NOTIFICATION,
this.notificationProcessor, this.pullMessageExecutor);
/**
* pollingInfoProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.POLLING_INFO,
this.pollingInfoProcessor, this.pullMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.POLLING_INFO,
this.pollingInfoProcessor, this.pullMessageExecutor);
/**
* ReplyMessageProcessor
@@ -1152,64 +1162,64 @@ public class BrokerController {
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
- this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE,
replyMessageProcessor, replyMessageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2,
replyMessageProcessor, replyMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE,
replyMessageProcessor, replyMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2,
replyMessageProcessor, replyMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE,
replyMessageProcessor, replyMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2,
replyMessageProcessor, replyMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE,
replyMessageProcessor, replyMessageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2,
replyMessageProcessor, replyMessageExecutor);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
+ remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
- this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
+ remotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
+ remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
+ remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new
ConsumerManageProcessor(this);
-
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
+ remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+ remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
/**
* QueryAssignmentProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT,
queryAssignmentProcessor, loadBalanceExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT,
queryAssignmentProcessor, loadBalanceExecutor);
-
this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE,
queryAssignmentProcessor, loadBalanceExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE,
queryAssignmentProcessor, loadBalanceExecutor);
+ remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT,
queryAssignmentProcessor, loadBalanceExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT,
queryAssignmentProcessor, loadBalanceExecutor);
+ remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE,
queryAssignmentProcessor, loadBalanceExecutor);
+
fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE,
queryAssignmentProcessor, loadBalanceExecutor);
/**
* EndTransactionProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION,
endTransactionProcessor, this.endTransactionExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION,
endTransactionProcessor, this.endTransactionExecutor);
+ remotingServer.registerProcessor(RequestCode.END_TRANSACTION,
endTransactionProcessor, this.endTransactionExecutor);
+ fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION,
endTransactionProcessor, this.endTransactionExecutor);
/*
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
- this.remotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
- this.fastRemotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
+ remotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
+ fastRemotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
/*
* Initialize the mapping of request codes to request headers.
@@ -1342,14 +1352,6 @@ public class BrokerController {
return producerManager;
}
- public void setFastRemotingServer(RemotingServer fastRemotingServer) {
- this.fastRemotingServer = fastRemotingServer;
- }
-
- public RemotingServer getFastRemotingServer() {
- return fastRemotingServer;
- }
-
public PullMessageProcessor getPullMessageProcessor() {
return pullMessageProcessor;
}
@@ -1400,12 +1402,11 @@ public class BrokerController {
this.shutdownHook.beforeShutdown(this);
}
- if (this.remotingServer != null) {
- this.remotingServer.shutdown();
- }
-
- if (this.fastRemotingServer != null) {
- this.fastRemotingServer.shutdown();
+ for (Map.Entry<String, RemotingServer> entry :
remotingServerMap.entrySet()) {
+ RemotingServer remotingServer = entry.getValue();
+ if (remotingServer != null) {
+ remotingServer.shutdown();
+ }
}
if (this.brokerMetricsManager != null) {
@@ -1658,19 +1659,20 @@ public class BrokerController {
remotingServerStartLatch.await();
}
- if (this.remotingServer != null) {
- this.remotingServer.start();
+ for (Map.Entry<String, RemotingServer> entry :
remotingServerMap.entrySet()) {
+ RemotingServer remotingServer = entry.getValue();
+ if (remotingServer != null) {
+ remotingServer.start();
- // In test scenarios where it is up to OS to pick up an available
port, set the listening port back to config
- if (null != nettyServerConfig && 0 ==
nettyServerConfig.getListenPort()) {
-
nettyServerConfig.setListenPort(remotingServer.localListenPort());
+ if (TCP_REMOTING_SERVER.equals(entry.getKey())) {
+ // In test scenarios where it is up to OS to pick up an
available port, set the listening port back to config
+ if (null != nettyServerConfig && 0 ==
nettyServerConfig.getListenPort()) {
+
nettyServerConfig.setListenPort(remotingServer.localListenPort());
+ }
+ }
}
}
- if (this.fastRemotingServer != null) {
- this.fastRemotingServer.start();
- }
-
this.storeHost = new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(),
this.getNettyServerConfig().getListenPort());
for (BrokerAttachedPlugin brokerAttachedPlugin :
brokerAttachedPlugins) {
@@ -2353,21 +2355,49 @@ public class BrokerController {
}
public void registerServerRPCHook(RPCHook rpcHook) {
- getRemotingServer().registerRPCHook(rpcHook);
- this.fastRemotingServer.registerRPCHook(rpcHook);
+ for (Map.Entry<String, RemotingServer> entry :
remotingServerMap.entrySet()) {
+ RemotingServer remotingServer = entry.getValue();
+ if (remotingServer != null) {
+ remotingServer.registerRPCHook(rpcHook);
+ }
+ }
}
public void setRequestPipeline(RequestPipeline pipeline) {
- this.getRemotingServer().setRequestPipeline(pipeline);
- this.fastRemotingServer.setRequestPipeline(pipeline);
+ for (Map.Entry<String, RemotingServer> entry :
remotingServerMap.entrySet()) {
+ RemotingServer remotingServer = entry.getValue();
+ if (remotingServer != null) {
+ remotingServer.setRequestPipeline(pipeline);
+ }
+ }
}
public RemotingServer getRemotingServer() {
- return remotingServer;
+ return remotingServerMap.get(TCP_REMOTING_SERVER);
}
public void setRemotingServer(RemotingServer remotingServer) {
- this.remotingServer = remotingServer;
+ remotingServerMap.put(TCP_REMOTING_SERVER, remotingServer);
+ }
+
+ public RemotingServer getFastRemotingServer() {
+ return remotingServerMap.get(FAST_REMOTING_SERVER);
+ }
+
+ public void setFastRemotingServer(RemotingServer fastRemotingServer) {
+ remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer);
+ }
+
+ public RemotingServer getRemotingServerByName(String name) {
+ return remotingServerMap.get(name);
+ }
+
+ public void setRemotingServerByName(String name, RemotingServer
remotingServer) {
+ remotingServerMap.put(name, remotingServer);
+ }
+
+ public ClientHousekeepingService getClientHousekeepingService() {
+ return clientHousekeepingService;
}
public CountDownLatch getRemotingServerStartLatch() {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 6035a20acb..3ce1fe3dbd 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -26,11 +26,18 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -94,4 +101,43 @@ public class BrokerControllerTest {
TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
}
+
+ @Test
+ public void testCustomRemotingServer() throws CloneNotSupportedException {
+ final RemotingServer mockRemotingServer = new
NettyRemotingServer(nettyServerConfig);
+ final String mockRemotingServerName = "MOCK_REMOTING_SERVER";
+
+ BrokerController brokerController = new BrokerController(brokerConfig,
nettyServerConfig, new NettyClientConfig(), messageStoreConfig);
+ brokerController.setRemotingServerByName(mockRemotingServerName,
mockRemotingServer);
+ brokerController.initializeRemotingServer();
+
+ final RPCHook rpcHook = new RPCHook() {
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand
request) {
+
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand
request, RemotingCommand response) {
+
+ }
+ };
+ brokerController.registerServerRPCHook(rpcHook);
+
+ // setRequestPipelineTest
+ final RequestPipeline requestPipeline = (ctx, request) -> {
+
+ };
+ brokerController.setRequestPipeline(requestPipeline);
+
+ NettyRemotingAbstract tcpRemotingServer = (NettyRemotingAbstract)
brokerController.getRemotingServer();
+ Assert.assertTrue(tcpRemotingServer.getRPCHook().contains(rpcHook));
+
+ NettyRemotingAbstract fastRemotingServer = (NettyRemotingAbstract)
brokerController.getFastRemotingServer();
+ Assert.assertTrue(fastRemotingServer.getRPCHook().contains(rpcHook));
+
+ NettyRemotingAbstract mockRemotingServer1 = (NettyRemotingAbstract)
brokerController.getRemotingServerByName(mockRemotingServerName);
+ Assert.assertTrue(mockRemotingServer1.getRPCHook().contains(rpcHook));
+ Assert.assertSame(mockRemotingServer, mockRemotingServer1);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bed6c1c476..30d7b0a1d5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
@@ -268,19 +269,43 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
private String nameSrvAddr = null;
private ClientConfig clientConfig;
- public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+ public MQClientAPIImpl(
+ final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
- RPCHook rpcHook, final ClientConfig clientConfig) {
+ final RPCHook rpcHook,
+ final ClientConfig clientConfig
+ ) {
this(nettyClientConfig, clientRemotingProcessor, rpcHook,
clientConfig, null);
}
- public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+ public MQClientAPIImpl(
+ final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
- RPCHook rpcHook, final ClientConfig clientConfig, final
ChannelEventListener channelEventListener) {
+ final RPCHook rpcHook,
+ final ClientConfig clientConfig,
+ final ChannelEventListener channelEventListener
+ ) {
+ this(
+ nettyClientConfig,
+ clientRemotingProcessor,
+ rpcHook,
+ clientConfig,
+ channelEventListener,
+ null
+ );
+ }
+
+ public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+ final ClientRemotingProcessor
clientRemotingProcessor,
+ RPCHook rpcHook, final ClientConfig clientConfig,
+ final ChannelEventListener channelEventListener,
+ final ObjectCreator<RemotingClient>
remotingClientCreator) {
this.clientConfig = clientConfig;
topAddressing = new DefaultTopAddressing(MixAll.getWSAddr(),
clientConfig.getUnitName());
topAddressing.registerChangeCallBack(this);
- this.remotingClient = new NettyRemotingClient(nettyClientConfig,
channelEventListener);
+ this.remotingClient = remotingClientCreator != null
+ ? remotingClientCreator.create(nettyClientConfig,
channelEventListener)
+ : new NettyRemotingClient(nettyClientConfig, channelEventListener);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(new
NamespaceRpcHook(clientConfig));
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index 6624b3100d..c22f453477 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
@@ -48,6 +49,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -97,7 +99,17 @@ public class MQClientAPIExt extends MQClientAPIImpl {
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook
) {
- super(nettyClientConfig, clientRemotingProcessor, rpcHook,
clientConfig);
+ this(clientConfig, nettyClientConfig, clientRemotingProcessor,
rpcHook, null);
+ }
+
+ public MQClientAPIExt(
+ ClientConfig clientConfig,
+ NettyClientConfig nettyClientConfig,
+ ClientRemotingProcessor clientRemotingProcessor,
+ RPCHook rpcHook,
+ ObjectCreator<RemotingClient> remotingClientCreator
+ ) {
+ super(nettyClientConfig, clientRemotingProcessor, rpcHook,
clientConfig, null, remotingClientCreator);
this.clientConfig = clientConfig;
this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index 0fa31b6640..d85dcc70a5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -17,18 +17,22 @@
package org.apache.rocketmq.client.impl.mqclient;
import com.google.common.base.Strings;
+
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
public class MQClientAPIFactory implements StartAndShutdown {
@@ -40,16 +44,35 @@ public class MQClientAPIFactory implements StartAndShutdown
{
private final RPCHook rpcHook;
private final ScheduledExecutorService scheduledExecutorService;
private final NameserverAccessConfig nameserverAccessConfig;
+ private final ObjectCreator<RemotingClient> remotingClientCreator;
+
+ public MQClientAPIFactory(
+ NameserverAccessConfig nameserverAccessConfig,
+ String namePrefix,
+ int clientNum,
+ ClientRemotingProcessor clientRemotingProcessor,
+ RPCHook rpcHook,
+ ScheduledExecutorService scheduledExecutorService
+ ) {
+ this(nameserverAccessConfig, namePrefix, clientNum,
clientRemotingProcessor, rpcHook, scheduledExecutorService, null);
+ }
- public MQClientAPIFactory(NameserverAccessConfig nameserverAccessConfig,
String namePrefix, int clientNum,
+ public MQClientAPIFactory(
+ NameserverAccessConfig nameserverAccessConfig,
+ String namePrefix,
+ int clientNum,
ClientRemotingProcessor clientRemotingProcessor,
- RPCHook rpcHook, ScheduledExecutorService scheduledExecutorService) {
+ RPCHook rpcHook,
+ ScheduledExecutorService scheduledExecutorService,
+ ObjectCreator<RemotingClient> remotingClientCreator
+ ) {
this.nameserverAccessConfig = nameserverAccessConfig;
this.namePrefix = namePrefix;
this.clientNum = clientNum;
this.clientRemotingProcessor = clientRemotingProcessor;
this.rpcHook = rpcHook;
this.scheduledExecutorService = scheduledExecutorService;
+ this.remotingClientCreator = remotingClientCreator;
this.init();
}
@@ -102,9 +125,13 @@ public class MQClientAPIFactory implements
StartAndShutdown {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setDisableCallbackExecutor(true);
- MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(clientConfig,
nettyClientConfig,
+ MQClientAPIExt mqClientAPIExt = new MQClientAPIExt(
+ clientConfig,
+ nettyClientConfig,
clientRemotingProcessor,
- rpcHook);
+ rpcHook,
+ remotingClientCreator
+ );
if (!mqClientAPIExt.updateNameServerAddressList()) {
mqClientAPIExt.fetchNameServerAddr();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c76d0c734a..6cb96df05f 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
@@ -59,6 +60,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -2104,6 +2106,48 @@ public class MQClientAPIImplTest {
done.await();
}
+ @Test
+ public void testMQClientAPIImplWithoutObjectCreator() {
+ MQClientAPIImpl clientAPI = new MQClientAPIImpl(
+ new NettyClientConfig(),
+ null,
+ null,
+ new ClientConfig(),
+ null,
+ null
+ );
+ RemotingClient remotingClient1 = clientAPI.getRemotingClient();
+ Assert.assertTrue(remotingClient1 instanceof NettyRemotingClient);
+ }
+
+ @Test
+ public void testMQClientAPIImplWithObjectCreator() {
+ ObjectCreator<RemotingClient> clientObjectCreator = args -> new
MockRemotingClientTest((NettyClientConfig) args[0]);
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ MQClientAPIImpl clientAPI = new MQClientAPIImpl(
+ nettyClientConfig,
+ null,
+ null,
+ new ClientConfig(),
+ null,
+ clientObjectCreator
+ );
+ RemotingClient remotingClient1 = clientAPI.getRemotingClient();
+ Assert.assertTrue(remotingClient1 instanceof MockRemotingClientTest);
+ MockRemotingClientTest remotingClientTest = (MockRemotingClientTest)
remotingClient1;
+ Assert.assertSame(remotingClientTest.getNettyClientConfig(),
nettyClientConfig);
+ }
+
+ private static class MockRemotingClientTest extends NettyRemotingClient {
+ public MockRemotingClientTest(NettyClientConfig nettyClientConfig) {
+ super(nettyClientConfig);
+ }
+
+ public NettyClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+ }
+
private Properties createProperties() {
Properties result = new Properties();
result.put("key", "value");
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
similarity index 51%
copy from
proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
copy to common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
index c186752788..14c645424f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ObjectCreator.java
@@ -14,25 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.proxy.service;
+package org.apache.rocketmq.common;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.remoting.RPCHook;
-
-public class ServiceManagerFactory {
- public static ServiceManager createForLocalMode(BrokerController
brokerController) {
- return createForLocalMode(brokerController, null);
- }
-
- public static ServiceManager createForLocalMode(BrokerController
brokerController, RPCHook rpcHook) {
- return new LocalServiceManager(brokerController, rpcHook);
- }
-
- public static ServiceManager createForClusterMode() {
- return createForClusterMode(null);
- }
-
- public static ServiceManager createForClusterMode(RPCHook rpcHook) {
- return new ClusterServiceManager(rpcHook);
- }
+public interface ObjectCreator<T> {
+ T create(Object... args);
}
diff --git
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index a1c1eecf59..616188e52d 100644
---
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.MessageStore;
@@ -43,8 +44,11 @@ public class InnerBrokerController extends BrokerController {
@Override
protected void initializeRemotingServer() {
- this.remotingServer =
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
- this.fastRemotingServer =
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()
- 2);
+ RemotingServer remotingServer =
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
+ RemotingServer fastRemotingServer =
this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort()
- 2);
+
+ setRemotingServer(remotingServer);
+ setFastRemotingServer(fastRemotingServer);
}
@Override
@@ -119,11 +123,11 @@ public class InnerBrokerController extends
BrokerController {
scheduledFuture.cancel(true);
}
- if (this.remotingServer != null) {
+ if (getRemotingServer() != null) {
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
}
- if (this.fastRemotingServer != null) {
+ if (getFastRemotingServer() != null) {
this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort()
- 2);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 9786cec557..33b65d2550 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
import
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -51,6 +52,7 @@ import
org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.proxy.service.transaction.ClusterTransactionService;
import org.apache.rocketmq.proxy.service.transaction.TransactionService;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
public class ClusterServiceManager extends AbstractStartAndShutdown implements
ServiceManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -70,6 +72,10 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
protected MQClientAPIFactory transactionClientAPIFactory;
public ClusterServiceManager(RPCHook rpcHook) {
+ this(rpcHook, null);
+ }
+
+ public ClusterServiceManager(RPCHook rpcHook,
ObjectCreator<RemotingClient> remotingClientCreator) {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
NameserverAccessConfig nameserverAccessConfig = new
NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
proxyConfig.getNamesrvDomain(),
proxyConfig.getNamesrvDomainSubgroup());
@@ -81,14 +87,18 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
proxyConfig.getRocketmqMQClientNum(),
new DoNothingClientRemotingProcessor(null),
rpcHook,
- scheduledExecutorService);
+ scheduledExecutorService,
+ remotingClientCreator
+ );
+
this.operationClientAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
"OperationClient_",
1,
new DoNothingClientRemotingProcessor(null),
rpcHook,
- this.scheduledExecutorService
+ this.scheduledExecutorService,
+ remotingClientCreator
);
this.topicRouteService = new
ClusterTopicRouteService(operationClientAPIFactory);
@@ -105,7 +115,10 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
1,
new ProxyClientRemotingProcessor(producerManager),
rpcHook,
- scheduledExecutorService);
+ scheduledExecutorService,
+ remotingClientCreator
+ );
+
this.clusterTransactionService = new
ClusterTransactionService(this.topicRouteService, this.producerManager,
this.transactionClientAPIFactory);
this.proxyRelayService = new
ClusterProxyRelayService(this.clusterTransactionService);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
index c186752788..e1252fe31f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ServiceManagerFactory.java
@@ -17,7 +17,9 @@
package org.apache.rocketmq.proxy.service;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ObjectCreator;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
public class ServiceManagerFactory {
public static ServiceManager createForLocalMode(BrokerController
brokerController) {
@@ -29,10 +31,14 @@ public class ServiceManagerFactory {
}
public static ServiceManager createForClusterMode() {
- return createForClusterMode(null);
+ return createForClusterMode(null, null);
}
public static ServiceManager createForClusterMode(RPCHook rpcHook) {
- return new ClusterServiceManager(rpcHook);
+ return createForClusterMode(rpcHook, null);
+ }
+
+ public static ServiceManager createForClusterMode(RPCHook rpcHook,
ObjectCreator<RemotingClient> remotingClientCreator) {
+ return new ClusterServiceManager(rpcHook, remotingClientCreator);
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index d3f5a88cf2..a4f23f181a 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -673,6 +673,10 @@ public abstract class NettyRemotingAbstract {
}
}
+ public HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>
getProcessorTable() {
+ return processorTable;
+ }
+
class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new
LinkedBlockingQueue<>();
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 6ac54aed6d..e92809ccdf 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -98,7 +98,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
- private final NettyClientConfig nettyClientConfig;
+ protected final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker;
private final Lock lockChannelTables = new ReentrantLock();
@@ -288,6 +288,13 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
return null;
}
+ protected ChannelFuture doConnect(String addr) {
+ String[] hostAndPort = getHostAndPort(addr);
+ String host = hostAndPort[0];
+ int port = Integer.parseInt(hostAndPort[1]);
+ return fetchBootstrap(addr).connect(host, port);
+ }
+
private Bootstrap fetchBootstrap(String addr) {
Map.Entry<String, SocksProxyConfig> proxyEntry = getProxy(addr);
if (proxyEntry == null) {
@@ -359,7 +366,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
// Do not use RemotingHelper.string2SocketAddress(), it will directly
resolve the domain
- private String[] getHostAndPort(String address) {
+ protected String[] getHostAndPort(String address) {
int split = address.lastIndexOf(":");
return split < 0 ? new String[]{address} : new
String[]{address.substring(0, split), address.substring(split + 1)};
}
@@ -712,9 +719,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
private ChannelWrapper createChannel(String addr) {
- String[] hostAndPort = getHostAndPort(addr);
- ChannelFuture channelFuture = fetchBootstrap(addr)
- .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+ ChannelFuture channelFuture = doConnect(addr);
LOGGER.info("createChannel: begin to connect remote host[{}]
asynchronously", addr);
ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
@@ -1047,9 +1052,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
try {
if (isWrapperOf(channel)) {
channelToClose = channelFuture;
- String[] hostAndPort = getHostAndPort(channelAddress);
- channelFuture = fetchBootstrap(channelAddress)
- .connect(hostAndPort[0],
Integer.parseInt(hostAndPort[1]));
+ channelFuture = doConnect(channelAddress);
return true;
} else {
LOGGER.warn("channelWrapper has reconnect, so do
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
@@ -1119,15 +1122,14 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
}
- class NettyClientHandler extends
SimpleChannelInboundHandler<RemotingCommand> {
-
+ public class NettyClientHandler extends
SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand
msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
- class NettyConnectManageHandler extends ChannelDuplexHandler {
+ public class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress
remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index cbf25c23c6..7ed804483b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -54,19 +54,6 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
-import java.time.Duration;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
@@ -88,15 +75,28 @@ import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-@SuppressWarnings("NullableProblems")
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.time.Duration;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
public class NettyRemotingServer extends NettyRemotingAbstract implements
RemotingServer {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final Logger TRAFFIC_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_TRAFFIC_NAME);
private final ServerBootstrap serverBootstrap;
- private final EventLoopGroup eventLoopGroupSelector;
- private final EventLoopGroup eventLoopGroupBoss;
- private final NettyServerConfig nettyServerConfig;
+ protected final EventLoopGroup eventLoopGroupSelector;
+ protected final EventLoopGroup eventLoopGroupBoss;
+ protected final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
private final ScheduledExecutorService scheduledExecutorService;
@@ -120,18 +120,18 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
- private TlsModeHandler tlsModeHandler;
- private NettyEncoder encoder;
- private NettyConnectManageHandler connectionManageHandler;
- private NettyServerHandler serverHandler;
- private RemotingCodeDistributionHandler distributionHandler;
+ protected final TlsModeHandler tlsModeHandler = new
TlsModeHandler(TlsSystemConfig.tlsMode);
+ protected final NettyEncoder encoder = new NettyEncoder();
+ protected final NettyConnectManageHandler connectionManageHandler = new
NettyConnectManageHandler();
+ protected final NettyServerHandler serverHandler = new
NettyServerHandler();
+ protected final RemotingCodeDistributionHandler distributionHandler = new
RemotingCodeDistributionHandler();
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
- final ChannelEventListener channelEventListener) {
+ final ChannelEventListener
channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
@@ -140,13 +140,13 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
this.publicExecutor = buildPublicExecutor(nettyServerConfig);
this.scheduledExecutorService = buildScheduleExecutor();
- this.eventLoopGroupBoss = buildBossEventLoopGroup();
+ this.eventLoopGroupBoss = buildEventLoopGroupBoss();
this.eventLoopGroupSelector = buildEventLoopGroupSelector();
loadSslContext();
}
- private EventLoopGroup buildEventLoopGroupSelector() {
+ protected EventLoopGroup buildEventLoopGroupSelector() {
if (useEpoll()) {
return new
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new
ThreadFactoryImpl("NettyServerEPOLLSelector_"));
} else {
@@ -154,7 +154,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
}
- private EventLoopGroup buildBossEventLoopGroup() {
+ protected EventLoopGroup buildEventLoopGroupBoss() {
if (useEpoll()) {
return new EpollEventLoopGroup(1, new
ThreadFactoryImpl("NettyEPOLLBoss_"));
} else {
@@ -197,13 +197,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
&& Epoll.isAvailable();
}
- @Override
- public void start() {
- this.defaultEventExecutorGroup = new
DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactoryImpl("NettyServerCodecThread_"));
-
- prepareSharableHandlers();
-
+ protected void initServerBootstrap(ServerBootstrap serverBootstrap) {
serverBootstrap.group(this.eventLoopGroupBoss,
this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
@@ -220,6 +214,14 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
});
addCustomConfig(serverBootstrap);
+ }
+
+ @Override
+ public void start() {
+ this.defaultEventExecutorGroup = new
DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
+ new ThreadFactoryImpl("NettyServerCodecThread_"));
+
+ initServerBootstrap(serverBootstrap);
try {
ChannelFuture sync = serverBootstrap.bind().sync();
@@ -411,14 +413,6 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
return this.publicExecutor;
}
- private void prepareSharableHandlers() {
- tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode);
- encoder = new NettyEncoder();
- connectionManageHandler = new NettyConnectManageHandler();
- serverHandler = new NettyServerHandler();
- distributionHandler = new RemotingCodeDistributionHandler();
- }
-
private void printRemotingCodeDistribution() {
if (distributionHandler != null) {
String inBoundSnapshotString =
distributionHandler.getInBoundSnapshotString();
@@ -469,8 +463,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
if (detectionResult.state() ==
ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
- .addAfter(defaultEventExecutorGroup,
HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
- .addAfter(defaultEventExecutorGroup,
HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER,
HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER,
TLS_MODE_HANDLER, tlsModeHandler);
} else {
ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
}
@@ -664,7 +658,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public void registerProcessor(final int requestCode, final
NettyRequestProcessor processor,
- final ExecutorService executor) {
+ final ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = NettyRemotingServer.this.publicExecutor;
@@ -708,19 +702,19 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(final Channel channel, final
RemotingCommand request,
- final long timeoutMillis) throws InterruptedException,
RemotingSendRequestException, RemotingTimeoutException {
+ final long timeoutMillis) throws
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(final Channel channel, final RemotingCommand
request, final long timeoutMillis,
- final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ final InvokeCallback invokeCallback) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis,
invokeCallback);
}
@Override
public void invokeOneway(final Channel channel, final RemotingCommand
request,
- final long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ final long timeoutMillis) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}