This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit bdf5c95ce686ff2721d67b0ef040262b57691132 Author: zhouxiang <[email protected]> AuthorDate: Fri Dec 9 17:08:32 2022 +0800 [ISSUE #5485] Fix ClusterGrpcIT --- .../rocketmq/proxy/service/ClusterServiceManager.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java index 24b27aaa2..bc3c58ed0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java @@ -70,8 +70,6 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S public ClusterServiceManager(RPCHook rpcHook) { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); this.scheduledExecutorService = Executors.newScheduledThreadPool(3); - this.producerManager = new ProducerManager(); - this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout()); this.messagingClientAPIFactory = new MQClientAPIFactory( "ClusterMQClient_", @@ -86,20 +84,24 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S rpcHook, this.scheduledExecutorService ); + + this.topicRouteService = new ClusterTopicRouteService(operationClientAPIFactory); + this.messageService = new ClusterMessageService(this.topicRouteService, this.messagingClientAPIFactory); + this.metadataService = new ClusterMetadataService(topicRouteService, operationClientAPIFactory); + this.adminService = new DefaultAdminService(this.operationClientAPIFactory); + + this.producerManager = new ProducerManager(); + this.consumerManager = new ClusterConsumerManager(this.topicRouteService, this.adminService, this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout()); + this.transactionClientAPIFactory = new MQClientAPIFactory( "ClusterTransaction_", 1, new ProxyClientRemotingProcessor(producerManager), rpcHook, scheduledExecutorService); - - this.topicRouteService = new ClusterTopicRouteService(operationClientAPIFactory); - this.messageService = new ClusterMessageService(this.topicRouteService, this.messagingClientAPIFactory); this.clusterTransactionService = new ClusterTransactionService(this.topicRouteService, this.producerManager, rpcHook, this.transactionClientAPIFactory); this.proxyRelayService = new ClusterProxyRelayService(this.clusterTransactionService); - this.metadataService = new ClusterMetadataService(topicRouteService, operationClientAPIFactory); - this.adminService = new DefaultAdminService(this.operationClientAPIFactory); this.init(); }
