This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit bdf5c95ce686ff2721d67b0ef040262b57691132
Author: zhouxiang <[email protected]>
AuthorDate: Fri Dec 9 17:08:32 2022 +0800

    [ISSUE #5485] Fix ClusterGrpcIT
---
 .../rocketmq/proxy/service/ClusterServiceManager.java    | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 24b27aaa2..bc3c58ed0 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -70,8 +70,6 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
     public ClusterServiceManager(RPCHook rpcHook) {
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
-        this.producerManager = new ProducerManager();
-        this.consumerManager = new 
ClusterConsumerManager(this.topicRouteService, this.adminService, 
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), 
proxyConfig.getChannelExpiredTimeout());
 
         this.messagingClientAPIFactory = new MQClientAPIFactory(
             "ClusterMQClient_",
@@ -86,20 +84,24 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
             rpcHook,
             this.scheduledExecutorService
         );
+
+        this.topicRouteService = new 
ClusterTopicRouteService(operationClientAPIFactory);
+        this.messageService = new 
ClusterMessageService(this.topicRouteService, this.messagingClientAPIFactory);
+        this.metadataService = new ClusterMetadataService(topicRouteService, 
operationClientAPIFactory);
+        this.adminService = new 
DefaultAdminService(this.operationClientAPIFactory);
+
+        this.producerManager = new ProducerManager();
+        this.consumerManager = new 
ClusterConsumerManager(this.topicRouteService, this.adminService, 
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), 
proxyConfig.getChannelExpiredTimeout());
+
         this.transactionClientAPIFactory = new MQClientAPIFactory(
             "ClusterTransaction_",
             1,
             new ProxyClientRemotingProcessor(producerManager),
             rpcHook,
             scheduledExecutorService);
-
-        this.topicRouteService = new 
ClusterTopicRouteService(operationClientAPIFactory);
-        this.messageService = new 
ClusterMessageService(this.topicRouteService, this.messagingClientAPIFactory);
         this.clusterTransactionService = new 
ClusterTransactionService(this.topicRouteService, this.producerManager, rpcHook,
             this.transactionClientAPIFactory);
         this.proxyRelayService = new 
ClusterProxyRelayService(this.clusterTransactionService);
-        this.metadataService = new ClusterMetadataService(topicRouteService, 
operationClientAPIFactory);
-        this.adminService = new 
DefaultAdminService(this.operationClientAPIFactory);
 
         this.init();
     }

Reply via email to