This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 10d291846 [ISSUE #4832] Remove innerProducer and innerConsumer in 
EscapeBridge (#4834)
10d291846 is described below

commit 10d291846e8035941f6c69a3d8c8f2db83acaa2b
Author: caigy <[email protected]>
AuthorDate: Wed Aug 24 15:18:51 2022 +0800

    [ISSUE #4832] Remove innerProducer and innerConsumer in EscapeBridge (#4834)
    
    * remove innerProducer and innerConsumer in Escape
     #4832
    
    * format code to pass code-style check
    
    * Move topic route management logic to TopicRouteInfoManager
    
    * Use ThreadPoolExecutor instead of creating a new ForkJoinPool
    
    * remove retry logic
    
    * remove unnecessary name server list
    
    * remove unnecessary invokeId
    
    * remove AssignmentManager and make maintenance of topic subscribe data 
lazy-initialized
    
    * simply code
    
    * remove unnecessary code
---
 .../apache/rocketmq/broker/BrokerController.java   |  45 ++--
 .../rocketmq/broker/failover/EscapeBridge.java     | 221 ++++++++++------
 .../broker/loadbalance/AssignmentManager.java      | 141 -----------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 157 +++++++++++-
 .../broker/processor/QueryAssignmentProcessor.java |   8 +-
 .../broker/topic/TopicRouteInfoManager.java        | 280 +++++++++++++++++++++
 .../rocketmq/broker/failover/EscapeBridgeTest.java |   3 -
 .../processor/QueryAssignmentProcessorTest.java    |   8 +-
 .../rocketmq/container/InnerBrokerController.java  |  11 -
 .../test/container/PopSlaveActingMasterIT.java     |  14 +-
 10 files changed, 597 insertions(+), 291 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 84e1689ce..681ca6ef0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -59,7 +59,6 @@ import 
org.apache.rocketmq.broker.filtersrv.FilterServerManager;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
 import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
 import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
 import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -95,6 +94,7 @@ import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
 import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -171,7 +171,6 @@ public class BrokerController {
     protected final ConsumerOrderInfoManager consumerOrderInfoManager;
     protected final ProducerManager producerManager;
     protected final ScheduleMessageService scheduleMessageService;
-    protected final AssignmentManager assignmentManager;
     protected final ClientHousekeepingService clientHousekeepingService;
     protected final PullMessageProcessor pullMessageProcessor;
     protected final PeekMessageProcessor peekMessageProcessor;
@@ -191,6 +190,7 @@ public class BrokerController {
     protected final ConsumerIdsChangeListener consumerIdsChangeListener;
     protected final EndTransactionProcessor endTransactionProcessor;
     private final RebalanceLockManager rebalanceLockManager = new 
RebalanceLockManager();
+    private final TopicRouteInfoManager topicRouteInfoManager;
     protected BrokerOuterAPI brokerOuterAPI;
     protected ScheduledExecutorService scheduledExecutorService;
     protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
@@ -322,7 +322,6 @@ public class BrokerController {
 
         this.filterServerManager = new FilterServerManager(this);
 
-        this.assignmentManager = new AssignmentManager(this);
         this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
         this.clientManageProcessor = new ClientManageProcessor(this);
         this.slaveSynchronize = new SlaveSynchronize(this);
@@ -386,6 +385,8 @@ public class BrokerController {
 
         this.escapeBridge = new EscapeBridge(this);
 
+        this.topicRouteInfoManager = new TopicRouteInfoManager(this);
+
         if (this.brokerConfig.isEnableSlaveActingMaster() && 
!this.brokerConfig.isSkipPreOnline()) {
             this.brokerPreOnlineService = new BrokerPreOnlineService(this);
         }
@@ -1238,10 +1239,6 @@ public class BrokerController {
             this.ackMessageProcessor.shutdownPopReviveService();
         }
 
-        if (this.assignmentManager != null) {
-            this.assignmentManager.shutdown();
-        }
-
         if (this.notificationProcessor != null) {
             this.notificationProcessor.shutdown();
         }
@@ -1353,6 +1350,10 @@ public class BrokerController {
             escapeBridge.shutdown();
         }
 
+        if (this.topicRouteInfoManager != null) {
+            this.topicRouteInfoManager.shutdown();
+        }
+
         if (this.brokerPreOnlineService != null && 
!this.brokerPreOnlineService.isStopped()) {
             this.brokerPreOnlineService.shutdown();
         }
@@ -1448,10 +1449,6 @@ public class BrokerController {
             this.ackMessageProcessor.startPopReviveService();
         }
 
-        if (this.assignmentManager != null) {
-            this.assignmentManager.start();
-        }
-
         if (this.topicQueueMappingCleanService != null) {
             this.topicQueueMappingCleanService.start();
         }
@@ -1484,6 +1481,10 @@ public class BrokerController {
             this.escapeBridge.start();
         }
 
+        if (this.topicRouteInfoManager != null) {
+            this.topicRouteInfoManager.start();
+        }
+
         if (this.brokerPreOnlineService != null) {
             this.brokerPreOnlineService.start();
         }
@@ -1779,16 +1780,6 @@ public class BrokerController {
         return needRegister;
     }
 
-    public String getNameServerList() {
-        if (this.brokerConfig.getNamesrvAddr() != null) {
-            
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
-            return this.brokerConfig.getNamesrvAddr();
-        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
-            return this.brokerOuterAPI.fetchNameServerAddr();
-        }
-        return null;
-    }
-
     public void startService(long minBrokerId, String minBrokerAddr) {
         BrokerController.LOG.info("{} start service, min broker id is {}, min 
broker addr: {}",
             this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
@@ -2153,14 +2144,6 @@ public class BrokerController {
         return sendMessageExecutor;
     }
 
-    public AssignmentManager getAssignmentManager() {
-        return assignmentManager;
-    }
-
-    public ClientManageProcessor getClientManageProcessor() {
-        return clientManageProcessor;
-    }
-
     public SendMessageProcessor getSendMessageProcessor() {
         return sendMessageProcessor;
     }
@@ -2253,4 +2236,8 @@ public class BrokerController {
         return timerCheckpoint;
     }
 
+    public TopicRouteInfoManager getTopicRouteInfoManager() {
+        return this.topicRouteInfoManager;
+    }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 22a72a888..60860db74 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -20,38 +20,47 @@ package org.apache.rocketmq.broker.failover;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MessageQueueSelector;
-import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 
 public class EscapeBridge {
     protected static final InternalLogger LOG = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long SEND_TIMEOUT = 3000L;
+    private static final long DEFAULT_PULL_TIMEOUT_MILLIS = 1000 * 10L;
     private final String innerProducerGroupName;
     private final String innerConsumerGroupName;
 
     private final BrokerController brokerController;
 
-    private DefaultMQProducer innerProducer;
-    private DefaultMQPullConsumer innerConsumer;
+    private ExecutorService defaultAsyncSenderExecutor;
 
     public EscapeBridge(BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -61,46 +70,22 @@ public class EscapeBridge {
 
     public void start() throws Exception {
         if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && 
brokerController.getBrokerConfig().isEnableRemoteEscape()) {
-            String nameserver = this.brokerController.getNameServerList();
-            if (nameserver != null && !nameserver.isEmpty()) {
-                startInnerProducer(nameserver);
-                startInnerConsumer(nameserver);
-                LOG.info("start inner producer and consumer success.");
-            } else {
-                throw new RuntimeException("nameserver address is null or 
empty");
-            }
+            final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new 
LinkedBlockingQueue<>(50000);
+            this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+                Runtime.getRuntime().availableProcessors(),
+                Runtime.getRuntime().availableProcessors(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                asyncSenderThreadPoolQueue,
+                new ThreadFactoryImpl("AsyncEscapeBridgeExecutor_", 
this.brokerController.getBrokerIdentity())
+            );
+            LOG.info("init executor for escaping messages asynchronously 
success.");
         }
     }
 
     public void shutdown() {
-        if (this.innerProducer != null) {
-            this.innerProducer.shutdown();
-        }
-
-        if (this.innerConsumer != null) {
-            this.innerConsumer.shutdown();
-        }
-    }
-
-    private void startInnerProducer(String nameServer) throws 
MQClientException {
-        try {
-            innerProducer = new DefaultMQProducer(innerProducerGroupName);
-            innerProducer.setNamesrvAddr(nameServer);
-            innerProducer.start();
-        } catch (MQClientException e) {
-            LOG.error("start inner producer failed, nameserver address: {}", 
nameServer, e);
-            throw e;
-        }
-    }
-
-    private void startInnerConsumer(String nameServer) throws 
MQClientException {
-        try {
-            innerConsumer = new DefaultMQPullConsumer(innerConsumerGroupName);
-            innerConsumer.setNamesrvAddr(nameServer);
-            innerConsumer.start();
-        } catch (MQClientException e) {
-            LOG.error("start inner consumer failed, nameserver address: {}", 
nameServer, e);
-            throw e;
+        if (null != this.defaultAsyncSenderExecutor) {
+            this.defaultAsyncSenderExecutor.shutdown();
         }
     }
 
@@ -109,12 +94,11 @@ public class EscapeBridge {
         if (masterBroker != null) {
             return masterBroker.getMessageStore().putMessage(messageExt);
         } else if 
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
-            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
-            && innerProducer != null) {
-            // Remote Acting lead to born timestamp, msgId changed, it needs 
to polish.
+            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) 
{
+
             try {
                 messageExt.setWaitStoreMsgOK(false);
-                SendResult sendResult = innerProducer.send(messageExt);
+                final SendResult sendResult = 
putMessageToRemoteBroker(messageExt);
                 return transformSendResult2PutResult(sendResult);
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
@@ -127,29 +111,73 @@ public class EscapeBridge {
         }
     }
 
+    private SendResult putMessageToRemoteBroker(MessageExtBrokerInner 
messageExt) {
+        final TopicPublishInfo topicPublishInfo = 
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+            LOG.warn("putMessageToRemoteBroker: no route info of topic {} when 
escaping message, msgId={}",
+                messageExt.getTopic(), messageExt.getMsgId());
+            return null;
+        }
+
+        final MessageQueue mqSelected = 
topicPublishInfo.selectOneMessageQueue();
+        messageExt.setQueueId(mqSelected.getQueueId());
+
+        final String brokerNameToSend = mqSelected.getBrokerName();
+        final String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+
+        final long beginTimestamp = System.currentTimeMillis();
+        try {
+            final SendResult sendResult = 
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+                brokerAddrToSend, brokerNameToSend,
+                messageExt, this.getProducerGroup(messageExt), SEND_TIMEOUT);
+            if (null != sendResult && 
SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                return sendResult;
+            } else {
+                LOG.error("Escaping failed! cost {}ms, Topic: {}, MsgId: {}, 
Broker: {}",
+                    System.currentTimeMillis() - beginTimestamp, 
messageExt.getTopic(),
+                    messageExt.getMsgId(), brokerNameToSend);
+            }
+        } catch (RemotingException | MQBrokerException e) {
+            LOG.error(String.format("putMessageToRemoteBroker exception, 
MsgId: %s, RT: %sms, Broker: %s",
+                messageExt.getMsgId(), System.currentTimeMillis() - 
beginTimestamp, mqSelected), e);
+        } catch (InterruptedException e) {
+            LOG.error(String.format("putMessageToRemoteBroker interrupted, 
MsgId: %s, RT: %sms, Broker: %s",
+                messageExt.getMsgId(), System.currentTimeMillis() - 
beginTimestamp, mqSelected), e);
+            Thread.currentThread().interrupt();
+        }
+
+        return null;
+    }
+
     public CompletableFuture<PutMessageResult> 
asyncPutMessage(MessageExtBrokerInner messageExt) {
         BrokerController masterBroker = 
this.brokerController.peekMasterBroker();
-        CompletableFuture<PutMessageResult> completableFuture = new 
CompletableFuture<>();
         if (masterBroker != null) {
             return masterBroker.getMessageStore().asyncPutMessage(messageExt);
         } else if 
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
-            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
-            && innerProducer != null) {
-            // Remote Acting lead to born timestamp, msgId changed, it needs 
to polish.
+            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) 
{
             try {
                 messageExt.setWaitStoreMsgOK(false);
-                innerProducer.send(messageExt, new SendCallback() {
-                    @Override
-                    public void onSuccess(SendResult sendResult) {
-                        
completableFuture.complete(transformSendResult2PutResult(sendResult));
-                    }
 
-                    @Override
-                    public void onException(Throwable e) {
-                        completableFuture.complete(new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
-                    }
-                });
-                return completableFuture;
+                final TopicPublishInfo topicPublishInfo = 
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+                final String producerGroup = getProducerGroup(messageExt);
+
+                final MessageQueue mqSelected = 
topicPublishInfo.selectOneMessageQueue();
+                messageExt.setQueueId(mqSelected.getQueueId());
+
+                final String brokerNameToSend = mqSelected.getBrokerName();
+                final String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+                final CompletableFuture<SendResult> future = 
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(brokerAddrToSend,
+                    brokerNameToSend, messageExt,
+                    producerGroup, SEND_TIMEOUT);
+
+                return future.exceptionally(throwable -> null)
+                    .thenApplyAsync(sendResult -> {
+                        return transformSendResult2PutResult(sendResult);
+                    }, this.defaultAsyncSenderExecutor)
+                    .exceptionally(throwable -> {
+                        return transformSendResult2PutResult(null);
+                    });
+
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
                 return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
@@ -161,24 +189,47 @@ public class EscapeBridge {
         }
     }
 
+
+    private String getProducerGroup(MessageExtBrokerInner messageExt) {
+        if (null == messageExt) {
+            return this.innerProducerGroupName;
+        }
+        String producerGroup = 
messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+        if (StringUtils.isEmpty(producerGroup)) {
+            producerGroup = this.innerProducerGroupName;
+        }
+        return producerGroup;
+    }
+
+
     public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner 
messageExt) {
         BrokerController masterBroker = 
this.brokerController.peekMasterBroker();
         if (masterBroker != null) {
             return masterBroker.getMessageStore().putMessage(messageExt);
         } else if 
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
-            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
-            && this.innerProducer != null) {
+            && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) 
{
             try {
                 messageExt.setWaitStoreMsgOK(false);
-                // Remote Acting lead to born timestamp, msgId changed, it 
needs to polish.
-                SendResult sendResult = innerProducer.send(messageExt, new 
MessageQueueSelector() {
-                    @Override
-                    public MessageQueue select(List<MessageQueue> mqs, Message 
msg, Object arg) {
-                        String id = (String) arg;
-                        int index = Math.abs(id.hashCode()) % mqs.size();
-                        return mqs.get(index);
-                    }
-                }, messageExt.getTopic() + messageExt.getStoreHost());
+
+                final TopicPublishInfo topicPublishInfo = 
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+                List<MessageQueue> mqs = 
topicPublishInfo.getMessageQueueList();
+
+                if (null == mqs || mqs.isEmpty()) {
+                    return new 
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+                }
+
+                String id = messageExt.getTopic() + messageExt.getStoreHost();
+                final int index = Math.floorMod(id.hashCode(), mqs.size());
+
+                MessageQueue mq = mqs.get(index);
+                messageExt.setQueueId(mq.getQueueId());
+
+                String brokerNameToSend = mq.getBrokerName();
+                String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+                final SendResult sendResult = 
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+                    brokerAddrToSend, brokerNameToSend,
+                    messageExt, this.getProducerGroup(messageExt), 
SEND_TIMEOUT);
+
                 return transformSendResult2PutResult(sendResult);
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
@@ -224,10 +275,8 @@ public class EscapeBridge {
             } else {
                 return list.get(0);
             }
-        } else if (innerConsumer != null) {
-            return getMessageFromRemote(topic, offset, queueId, brokerName);
         } else {
-            return null;
+            return getMessageFromRemote(topic, offset, queueId, brokerName);
         }
     }
 
@@ -261,7 +310,20 @@ public class EscapeBridge {
 
     protected MessageExt getMessageFromRemote(String topic, long offset, int 
queueId, String brokerName) {
         try {
-            PullResult pullResult = innerConsumer.pull(new MessageQueue(topic, 
brokerName, queueId), "*", offset, 1);
+            String brokerAddr = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
 MixAll.MASTER_ID, false);
+            if (null == brokerAddr) {
+                
this.brokerController.getTopicRouteInfoManager().updateTopicRouteInfoFromNameServer(topic,
 true, false);
+                brokerAddr = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
 MixAll.MASTER_ID, false);
+
+                if (null == brokerAddr) {
+                    LOG.warn("can't find broker address for topic {}", topic);
+                    return null;
+                }
+            }
+
+            PullResult pullResult = 
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBroker(brokerName,
+                brokerAddr, this.innerConsumerGroupName, topic, queueId, 
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS);
+
             if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                 return pullResult.getMsgFoundList().get(0);
             }
@@ -271,4 +333,5 @@ public class EscapeBridge {
 
         return null;
     }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
deleted file mode 100644
index 606de86d1..000000000
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.broker.loadbalance;
-
-import com.google.common.collect.Lists;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-
-public class AssignmentManager {
-    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    private transient BrokerController brokerController;
-
-    private final ConcurrentHashMap<String, Set<MessageQueue>> 
topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
-
-    private ScheduledExecutorService scheduledExecutorService;
-
-    private static final List<String> IGNORE_ROUTE_TOPICS = Lists.newArrayList(
-        MixAll.CID_RMQ_SYS_PREFIX,
-        MixAll.DEFAULT_CONSUMER_GROUP,
-        MixAll.TOOLS_CONSUMER_GROUP,
-        MixAll.FILTERSRV_CONSUMER_GROUP,
-        MixAll.MONITOR_CONSUMER_GROUP,
-        MixAll.ONS_HTTP_PROXY_GROUP,
-        MixAll.CID_ONSAPI_PERMISSION_GROUP,
-        MixAll.CID_ONSAPI_OWNER_GROUP,
-        MixAll.CID_ONSAPI_PULL_GROUP
-    );
-
-    private final List<String> ignoreRouteTopics = 
Lists.newArrayList(IGNORE_ROUTE_TOPICS);
-
-    public AssignmentManager(BrokerController brokerController) {
-        this.brokerController = brokerController;
-        
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerClusterName());
-        
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerName());
-        scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("LoadBalanceManagerScheduledThread", 
brokerController.getBrokerIdentity()));
-    }
-
-    public void start() {
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    updateTopicRouteInfoFromNameServer();
-                } catch (Exception e) {
-                    log.error("ScheduledTask: failed to pull TopicRouteData 
from NameServer", e);
-                }
-            }
-        }, 1000, 
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(), 
TimeUnit.MILLISECONDS);
-    }
-
-    public void shutdown() {
-        this.scheduledExecutorService.shutdown();
-    }
-
-    public void updateTopicRouteInfoFromNameServer() {
-        Set<String> topicList = new 
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
-        LOOP:
-        for (String topic : topicList) {
-            for (String keyword : ignoreRouteTopics) {
-                if (topic.contains(keyword) || 
TopicValidator.isSystemTopic(topic)) {
-                    continue LOOP;
-                }
-            }
-
-            this.updateTopicRouteInfoFromNameServer(topic);
-        }
-    }
-
-    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
-        try {
-            TopicRouteData topicRouteData = 
this.brokerController.getBrokerOuterAPI().getTopicRouteInfoFromNameServer(topic,
 1000 * 3);
-            if (topicRouteData != null) {
-                topicRouteData.setTopicQueueMappingByBroker(null);
-                Set<MessageQueue> newSubscribeInfo = 
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
-                Set<MessageQueue> oldSubscribeInfo = 
topicSubscribeInfoTable.get(topic);
-                boolean changed = !newSubscribeInfo.equals(oldSubscribeInfo);
-
-                if (changed) {
-                    log.info("the topic[{}] subscribe message queue changed, 
old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
-                    topicSubscribeInfoTable.put(topic, newSubscribeInfo);
-                    return true;
-                }
-            } else {
-                log.warn("updateTopicRouteInfoFromNameServer, 
getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
-            }
-        } catch (Exception e) {
-            if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                log.warn("updateTopicRouteInfoFromNameServer Exception", e);
-                if (e instanceof MQBrokerException && 
ResponseCode.TOPIC_NOT_EXIST == ((MQBrokerException) e).getResponseCode()) {
-                    // clean no used topic
-                    cleanNoneRouteTopic(topic);
-                }
-            }
-        }
-        return false;
-    }
-
-    private void cleanNoneRouteTopic(String topic) {
-        // clean no used topic
-        topicSubscribeInfoTable.remove(topic);
-    }
-
-    public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
-        return topicSubscribeInfoTable.get(topic);
-    }
-}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 202fcbfdb..20605a7da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,13 +23,17 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -43,7 +47,9 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UnlockCallback;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -76,6 +82,8 @@ import 
org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
@@ -95,11 +103,13 @@ import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBro
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.rpc.RpcClient;
 import org.apache.rocketmq.common.rpc.RpcClientImpl;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -866,6 +876,51 @@ public class BrokerOuterAPI {
                                                   final MessageExt msg, String 
group,
                                                   long timeoutMillis) throws 
RemotingException, MQBrokerException, InterruptedException {
 
+        RemotingCommand request = buildSendMessageRequest(msg, group);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
+        return this.processSendResponse(brokerName, msg, response);
+    }
+
+    public CompletableFuture<SendResult> 
sendMessageToSpecificBrokerAsync(String brokerAddr, final String brokerName,
+                                                                          
final MessageExt msg, String group,
+                                                                          long 
timeoutMillis) {
+        RemotingCommand request = buildSendMessageRequest(msg, group);
+
+        CompletableFuture<SendResult> cf = new CompletableFuture<>();
+        final String msgId = msg.getMsgId();
+        try {
+            this.remotingClient.invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (null != response) {
+                    SendResult sendResult = null;
+                    try {
+                        sendResult = this.processSendResponse(brokerName, msg, 
response);
+                        cf.complete(sendResult);
+                    } catch (MQBrokerException | RemotingCommandException e) {
+                        LOGGER.error("processSendResponse in 
sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
+                        cf.completeExceptionally(e);
+                    }
+                } else {
+                    cf.complete(null);
+                }
+
+            });
+        } catch (Throwable t) {
+            LOGGER.error("invokeAsync failed in 
sendMessageToSpecificBrokerAsync, msgId=" + msgId, t);
+            cf.completeExceptionally(t);
+        }
+        return cf;
+    }
+
+    private static RemotingCommand buildSendMessageRequest(MessageExt msg, 
String group) {
+        SendMessageRequestHeaderV2 requestHeaderV2 = 
buildSendMessageRequestHeaderV2(msg, group);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
+
+        request.setBody(msg.getBody());
+        return request;
+    }
+
+    private static SendMessageRequestHeaderV2 
buildSendMessageRequestHeaderV2(MessageExt msg, String group) {
         SendMessageRequestHeader requestHeader = new 
SendMessageRequestHeader();
         requestHeader.setProducerGroup(group);
         requestHeader.setTopic(msg.getTopic());
@@ -880,13 +935,7 @@ public class BrokerOuterAPI {
         requestHeader.setBatch(false);
 
         SendMessageRequestHeaderV2 requestHeaderV2 = 
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
-
-        request.setBody(msg.getBody());
-
-        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
-
-        return this.processSendResponse(brokerName, msg, response);
+        return requestHeaderV2;
     }
 
     private SendResult processSendResponse(
@@ -1166,4 +1215,98 @@ public class BrokerOuterAPI {
         });
     }
 
+    public PullResult pullMessageFromSpecificBroker(String brokerName, String 
brokerAddr,
+                                                    String consumerGroup, 
String topic, int queueId, long offset,
+                                                    int maxNums,
+                                                    long timeoutMillis) throws 
MQBrokerException, RemotingException, InterruptedException {
+
+        PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setQueueOffset(offset);
+        requestHeader.setMaxMsgNums(maxNums);
+        requestHeader.setSysFlag(PullSysFlag.buildSysFlag(false, false, true, 
false));
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(10_0000L);
+        requestHeader.setSubscription(SubscriptionData.SUB_ALL);
+        requestHeader.setSubVersion(System.currentTimeMillis());
+        requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
+        requestHeader.setExpressionType(ExpressionType.TAG);
+        requestHeader.setBname(brokerName);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+        RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, 
request, timeoutMillis);
+        PullResultExt pullResultExt = this.processPullResponse(response, 
brokerAddr);
+        this.processPullResult(pullResultExt, brokerName, queueId);
+        return pullResultExt;
+    }
+
+    private PullResultExt processPullResponse(
+            final RemotingCommand response,
+            final String addr) throws MQBrokerException, 
RemotingCommandException {
+        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS:
+                pullStatus = PullStatus.FOUND;
+                break;
+            case ResponseCode.PULL_NOT_FOUND:
+                pullStatus = PullStatus.NO_NEW_MSG;
+                break;
+            case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                pullStatus = PullStatus.NO_MATCHED_MSG;
+                break;
+            case ResponseCode.PULL_OFFSET_MOVED:
+                pullStatus = PullStatus.OFFSET_ILLEGAL;
+                break;
+
+            default:
+                throw new MQBrokerException(response.getCode(), 
response.getRemark(), addr);
+        }
+
+        PullMessageResponseHeader responseHeader =
+                (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+        return new PullResultExt(pullStatus, 
responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
+                responseHeader.getMaxOffset(), null, 
responseHeader.getSuggestWhichBrokerId(), response.getBody(), 
responseHeader.getOffsetDelta());
+
+    }
+
+    private PullResult processPullResult(final PullResultExt pullResult, 
String brokerName, int queueId) {
+
+        if (PullStatus.FOUND == pullResult.getPullStatus()) {
+            ByteBuffer byteBuffer = 
ByteBuffer.wrap(pullResult.getMessageBinary());
+            List<MessageExt> msgList = MessageDecoder.decodesBatch(
+                    byteBuffer,
+                    true,
+                    true,
+                    true
+            );
+
+            // Currently batch messages are not supported
+            for (MessageExt msg : msgList) {
+                String traFlag = 
msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+                if (Boolean.parseBoolean(traFlag)) {
+                    
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+                }
+                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_MIN_OFFSET,
+                        Long.toString(pullResult.getMinOffset()));
+                MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_MAX_OFFSET,
+                        Long.toString(pullResult.getMaxOffset()));
+                msg.setBrokerName(brokerName);
+                msg.setQueueId(queueId);
+                if (pullResult.getOffsetDelta() != null) {
+                    msg.setQueueOffset(pullResult.getOffsetDelta() + 
msg.getQueueOffset());
+                }
+            }
+
+            pullResult.setMsgFoundList(msgList);
+        }
+
+        pullResult.setMessageBinary(null);
+
+        return pullResult;
+    }
+
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index 7173606f3..01fcc773f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -27,8 +27,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
 import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
@@ -163,18 +163,18 @@ public class QueryAssignmentProcessor implements 
NettyRequestProcessor {
         final MessageModel messageModel, final String strategyName,
         SetMessageRequestModeRequestBody setMessageRequestModeRequestBody, 
final ChannelHandlerContext ctx) {
         Set<MessageQueue> assignedQueueSet = null;
-        AssignmentManager assignmentManager = 
brokerController.getAssignmentManager();
+        final TopicRouteInfoManager topicRouteInfoManager = 
this.brokerController.getTopicRouteInfoManager();
 
         switch (messageModel) {
             case BROADCASTING: {
-                assignedQueueSet = 
assignmentManager.getTopicSubscribeInfo(topic);
+                assignedQueueSet = 
topicRouteInfoManager.getTopicSubscribeInfo(topic);
                 if (assignedQueueSet == null) {
                     log.warn("QueryLoad: no assignment for group[{}], the 
topic[{}] does not exist.", consumerGroup, topic);
                 }
                 break;
             }
             case CLUSTERING: {
-                Set<MessageQueue> mqSet = 
assignmentManager.getTopicSubscribeInfo(topic);
+                Set<MessageQueue> mqSet = 
topicRouteInfoManager.getTopicSubscribeInfo(topic);
                 if (null == mqSet) {
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                         log.warn("QueryLoad: no assignment for group[{}], the 
topic[{}] does not exist.", consumerGroup, topic);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
new file mode 100644
index 000000000..a98a228ec
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicRouteInfoManager {
+
+    private static final long GET_TOPIC_ROUTE_TIMEOUT = 3000L;
+    private static final long LOCK_TIMEOUT_MILLIS = 3000L;
+    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final Lock lockNamesrv = new ReentrantLock();
+    private final ConcurrentMap<String/* Topic */, TopicRouteData> 
topicRouteTable = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* 
brokerId */, String/* address */>> brokerAddrTable =
+        new ConcurrentHashMap<>();
+    private final ConcurrentMap<String/* topic */, TopicPublishInfo> 
topicPublishInfoTable = new ConcurrentHashMap<>();
+
+    private final ConcurrentHashMap<String, Set<MessageQueue>> 
topicSubscribeInfoTable = new ConcurrentHashMap<>();
+
+    private ScheduledExecutorService scheduledExecutorService;
+    private BrokerController brokerController;
+
+    public TopicRouteInfoManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    public void start() {
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "TopicRouteInfoManagerScheduledThread");
+            }
+        });
+
+        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                updateTopicRouteInfoFromNameServer();
+            } catch (Exception e) {
+                log.error("ScheduledTask: failed to pull TopicRouteData from 
NameServer", e);
+            }
+        }, 1000, 
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(), 
TimeUnit.MILLISECONDS);
+    }
+
+    private void updateTopicRouteInfoFromNameServer() {
+        final Set<String> topicSetForPopAssignment = 
this.topicSubscribeInfoTable.keySet();
+        final Set<String> topicSetForEscapeBridge = 
this.topicRouteTable.keySet();
+        final Set<String> topicsAll = Sets.union(topicSetForPopAssignment, 
topicSetForEscapeBridge);
+
+        for (String topic : topicsAll) {
+            boolean isNeedUpdatePublishInfo = 
topicSetForEscapeBridge.contains(topic);
+            boolean isNeedUpdateSubscribeInfo = 
topicSetForPopAssignment.contains(topic);
+            updateTopicRouteInfoFromNameServer(topic, isNeedUpdatePublishInfo, 
isNeedUpdateSubscribeInfo);
+        }
+    }
+
+    public void updateTopicRouteInfoFromNameServer(String topic, boolean 
isNeedUpdatePublishInfo,
+        boolean isNeedUpdateSubscribeInfo) {
+        try {
+            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
+                try {
+                    final TopicRouteData topicRouteData = 
this.brokerController.getBrokerOuterAPI()
+                        .getTopicRouteInfoFromNameServer(topic, 
GET_TOPIC_ROUTE_TIMEOUT);
+                    if (null == topicRouteData) {
+                        log.warn("TopicRouteInfoManager: 
updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return 
null, Topic: {}.", topic);
+                        return;
+                    }
+
+                    if (isNeedUpdateSubscribeInfo) {
+                        this.updateSubscribeInfoTable(topicRouteData, topic);
+                    }
+
+                    if (isNeedUpdatePublishInfo) {
+                        this.updateTopicRouteTable(topic, topicRouteData);
+                    }
+                } catch (RemotingException e) {
+                    log.error("updateTopicRouteInfoFromNameServer Exception", 
e);
+                } catch (MQBrokerException e) {
+                    log.error("updateTopicRouteInfoFromNameServer Exception", 
e);
+                    if (!NamespaceUtil.isRetryTopic(topic)
+                        && ResponseCode.TOPIC_NOT_EXIST == 
e.getResponseCode()) {
+                        // clean no used topic
+                        cleanNoneRouteTopic(topic);
+                    }
+                } finally {
+                    this.lockNamesrv.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
+        }
+    }
+
+    private boolean updateTopicRouteTable(String topic, TopicRouteData 
topicRouteData) {
+        TopicRouteData old = this.topicRouteTable.get(topic);
+        boolean changed = this.topicRouteDataIsChange(old, topicRouteData);
+        if (!changed) {
+            if (!this.isNeedUpdateTopicRouteInfo(topic)) {
+                return false;
+            }
+        } else {
+            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", 
topic, old, topicRouteData);
+        }
+
+        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
+        }
+
+        TopicPublishInfo publishInfo = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+        publishInfo.setHaveTopicRouterInfo(true);
+        this.updateTopicPublishInfo(topic, publishInfo);
+
+        TopicRouteData cloneTopicRouteData = new 
TopicRouteData(topicRouteData);
+        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, 
cloneTopicRouteData);
+        this.topicRouteTable.put(topic, cloneTopicRouteData);
+
+        return true;
+    }
+
+    private boolean updateSubscribeInfoTable(TopicRouteData topicRouteData, 
String topic) {
+        final TopicRouteData tmp = new TopicRouteData(topicRouteData);
+        tmp.setTopicQueueMappingByBroker(null);
+        Set<MessageQueue> newSubscribeInfo = 
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, tmp);
+        Set<MessageQueue> oldSubscribeInfo = 
topicSubscribeInfoTable.get(topic);
+
+        if (Objects.equals(newSubscribeInfo, oldSubscribeInfo)) {
+            return false;
+        }
+
+        log.info("the topic[{}] subscribe message queue changed, old[{}] 
,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
+        topicSubscribeInfoTable.put(topic, newSubscribeInfo);
+        return true;
+
+    }
+
+    private boolean isNeedUpdateTopicRouteInfo(final String topic) {
+        final TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
+        return null == prev || !prev.ok();
+    }
+
+    private boolean topicRouteDataIsChange(TopicRouteData olddata, 
TopicRouteData nowdata) {
+        if (olddata == null || nowdata == null)
+            return true;
+        TopicRouteData old = new TopicRouteData(olddata);
+        TopicRouteData now = new TopicRouteData(nowdata);
+        Collections.sort(old.getQueueDatas());
+        Collections.sort(old.getBrokerDatas());
+        Collections.sort(now.getQueueDatas());
+        Collections.sort(now.getBrokerDatas());
+        return !old.equals(now);
+
+    }
+
+    private void cleanNoneRouteTopic(String topic) {
+        // clean no used topic
+        topicSubscribeInfoTable.remove(topic);
+    }
+
+    private void updateTopicPublishInfo(final String topic, final 
TopicPublishInfo info) {
+        if (info != null && topic != null) {
+            TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, 
info);
+            if (prev != null) {
+                log.info("updateTopicPublishInfo prev is not null, " + prev);
+            }
+        }
+    }
+
+    public void shutdown() {
+        if (null != this.scheduledExecutorService) {
+            this.scheduledExecutorService.shutdown();
+        }
+    }
+
+    public TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
+        TopicPublishInfo topicPublishInfo = 
this.topicPublishInfoTable.get(topic);
+        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+            this.updateTopicRouteInfoFromNameServer(topic, true, false);
+            topicPublishInfo = this.topicPublishInfoTable.get(topic);
+        }
+        return topicPublishInfo;
+    }
+
+    public String findBrokerAddressInPublish(String brokerName) {
+        if (brokerName == null) {
+            return null;
+        }
+        Map<Long/* brokerId */, String/* address */> map = 
this.brokerAddrTable.get(brokerName);
+        if (map != null && !map.isEmpty()) {
+            return map.get(MixAll.MASTER_ID);
+        }
+
+        return null;
+    }
+
+    public String findBrokerAddressInSubscribe(
+        final String brokerName,
+        final long brokerId,
+        final boolean onlyThisBroker
+    ) {
+        if (brokerName == null) {
+            return null;
+        }
+        String brokerAddr = null;
+        boolean found = false;
+
+        Map<Long/* brokerId */, String/* address */> map = 
this.brokerAddrTable.get(brokerName);
+        if (map != null && !map.isEmpty()) {
+            brokerAddr = map.get(brokerId);
+            boolean slave = brokerId != MixAll.MASTER_ID;
+            found = brokerAddr != null;
+
+            if (!found && slave) {
+                brokerAddr = map.get(brokerId + 1);
+                found = brokerAddr != null;
+            }
+
+            if (!found && !onlyThisBroker) {
+                Map.Entry<Long, String> entry = 
map.entrySet().iterator().next();
+                brokerAddr = entry.getValue();
+                found = true;
+            }
+        }
+
+        return brokerAddr;
+
+    }
+
+    public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
+        Set<MessageQueue> queues = topicSubscribeInfoTable.get(topic);
+        if (null == queues || queues.isEmpty()) {
+            this.updateTopicRouteInfoFromNameServer(topic, false, true);
+            queues = this.topicSubscribeInfoTable.get(topic);
+        }
+        return queues;
+    }
+
+
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index 685e85db1..a51e54209 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -64,8 +64,6 @@ public class EscapeBridgeTest {
 
     private static final String BROKER_NAME = "broker_a";
 
-    private static final String NAMESERVER_ADDR = "127.0.0.1:9876";
-
     private static final String TEST_TOPIC = "TEST_TOPIC";
 
     private static final int DEFAULT_QUEUE_ID = 0;
@@ -81,7 +79,6 @@ public class EscapeBridgeTest {
         escapeBridge = new EscapeBridge(brokerController);
         messageExtBrokerInner = new MessageExtBrokerInner();
         
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
-        when(brokerController.getNameServerList()).thenReturn(NAMESERVER_ADDR);
         brokerConfig.setEnableSlaveActingMaster(true);
         brokerConfig.setEnableRemoteEscape(true);
         escapeBridge.start();
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
index 347a24019..0367e3a1e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
@@ -66,7 +66,7 @@ public class QueryAssignmentProcessorTest {
     private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
 
     @Mock
-    private AssignmentManager assignmentManager;
+    private TopicRouteInfoManager topicRouteInfoManager;
     @Mock
     private ChannelHandlerContext handlerContext;
     @Mock
@@ -84,8 +84,8 @@ public class QueryAssignmentProcessorTest {
     public void init() throws IllegalAccessException, NoSuchFieldException {
         clientInfo = new ClientChannelInfo(channel, "127.0.0.1", 
LanguageCode.JAVA, 0);
         brokerController.setMessageStore(messageStore);
-        
doReturn(assignmentManager).when(brokerController).getAssignmentManager();
-        
when(assignmentManager.getTopicSubscribeInfo(topic)).thenReturn(ImmutableSet.of(new
 MessageQueue(topic, "broker-1", 0), new MessageQueue(topic, "broker-2", 1)));
+        
doReturn(topicRouteInfoManager).when(brokerController).getTopicRouteInfoManager();
+        
when(topicRouteInfoManager.getTopicSubscribeInfo(topic)).thenReturn(ImmutableSet.of(new
 MessageQueue(topic, "broker-1", 0), new MessageQueue(topic, "broker-2", 1)));
         queryAssignmentProcessor = new 
QueryAssignmentProcessor(brokerController);
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
         ConsumerData consumerData = createConsumerData(group, topic);
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index dd6eeeb61..47edd56e5 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -133,17 +133,6 @@ public class InnerBrokerController extends 
BrokerController {
         return this.brokerConfig.getBrokerIP1() + ":" + 
this.brokerConfig.getListenPort();
     }
 
-    @Override
-    public String getNameServerList() {
-        if (this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr() 
!= null) {
-            
this.brokerContainer.getBrokerOuterAPI().updateNameServerAddressList(brokerContainer.getBrokerContainerConfig().getNamesrvAddr());
-            return 
this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
-        } else if 
(this.brokerContainer.getBrokerContainerConfig().isFetchNamesrvAddrByAddressServer())
 {
-            return 
this.brokerContainer.getBrokerOuterAPI().fetchNameServerAddr();
-        }
-        return null;
-    }
-
     @Override
     public String getHAServerAddr() {
         return this.brokerConfig.getBrokerIP2() + ":" + 
this.messageStoreConfig.getHaListenPort();
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
index 17d5e0cc3..9c2b35a2b 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
@@ -168,18 +168,6 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
 
     @Test
     public void testLocalActing_notAckSlave() throws Exception {
-//        master1With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-//        master1With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//        
//master1With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-//
-//        master2With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-//        master2With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//        
//master2With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-//
-//        master3With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-//        master3With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//        
//master3With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s 
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-
         String topic = PopSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
         createTopic(topic);
         String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
CONSUME_GROUP);
@@ -585,4 +573,4 @@ public class PopSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
 
     }
 
-}
\ No newline at end of file
+}

Reply via email to