This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 10d291846 [ISSUE #4832] Remove innerProducer and innerConsumer in
EscapeBridge (#4834)
10d291846 is described below
commit 10d291846e8035941f6c69a3d8c8f2db83acaa2b
Author: caigy <[email protected]>
AuthorDate: Wed Aug 24 15:18:51 2022 +0800
[ISSUE #4832] Remove innerProducer and innerConsumer in EscapeBridge (#4834)
* remove innerProducer and innerConsumer in Escape
#4832
* format code to pass code-style check
* Move topic route management logic to TopicRouteInfoManager
* Use ThreadPoolExecutor instead of creating a new ForkJoinPool
* remove retry logic
* remove unnecessary name server list
* remove unnecessary invokeId
* remove AssignmentManager and make maintenance of topic subscribe data
lazy-initialized
* simply code
* remove unnecessary code
---
.../apache/rocketmq/broker/BrokerController.java | 45 ++--
.../rocketmq/broker/failover/EscapeBridge.java | 221 ++++++++++------
.../broker/loadbalance/AssignmentManager.java | 141 -----------
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 157 +++++++++++-
.../broker/processor/QueryAssignmentProcessor.java | 8 +-
.../broker/topic/TopicRouteInfoManager.java | 280 +++++++++++++++++++++
.../rocketmq/broker/failover/EscapeBridgeTest.java | 3 -
.../processor/QueryAssignmentProcessorTest.java | 8 +-
.../rocketmq/container/InnerBrokerController.java | 11 -
.../test/container/PopSlaveActingMasterIT.java | 14 +-
10 files changed, 597 insertions(+), 291 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 84e1689ce..681ca6ef0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -59,7 +59,6 @@ import
org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -95,6 +94,7 @@ import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@ -171,7 +171,6 @@ public class BrokerController {
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
protected final ProducerManager producerManager;
protected final ScheduleMessageService scheduleMessageService;
- protected final AssignmentManager assignmentManager;
protected final ClientHousekeepingService clientHousekeepingService;
protected final PullMessageProcessor pullMessageProcessor;
protected final PeekMessageProcessor peekMessageProcessor;
@@ -191,6 +190,7 @@ public class BrokerController {
protected final ConsumerIdsChangeListener consumerIdsChangeListener;
protected final EndTransactionProcessor endTransactionProcessor;
private final RebalanceLockManager rebalanceLockManager = new
RebalanceLockManager();
+ private final TopicRouteInfoManager topicRouteInfoManager;
protected BrokerOuterAPI brokerOuterAPI;
protected ScheduledExecutorService scheduledExecutorService;
protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
@@ -322,7 +322,6 @@ public class BrokerController {
this.filterServerManager = new FilterServerManager(this);
- this.assignmentManager = new AssignmentManager(this);
this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
this.clientManageProcessor = new ClientManageProcessor(this);
this.slaveSynchronize = new SlaveSynchronize(this);
@@ -386,6 +385,8 @@ public class BrokerController {
this.escapeBridge = new EscapeBridge(this);
+ this.topicRouteInfoManager = new TopicRouteInfoManager(this);
+
if (this.brokerConfig.isEnableSlaveActingMaster() &&
!this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}
@@ -1238,10 +1239,6 @@ public class BrokerController {
this.ackMessageProcessor.shutdownPopReviveService();
}
- if (this.assignmentManager != null) {
- this.assignmentManager.shutdown();
- }
-
if (this.notificationProcessor != null) {
this.notificationProcessor.shutdown();
}
@@ -1353,6 +1350,10 @@ public class BrokerController {
escapeBridge.shutdown();
}
+ if (this.topicRouteInfoManager != null) {
+ this.topicRouteInfoManager.shutdown();
+ }
+
if (this.brokerPreOnlineService != null &&
!this.brokerPreOnlineService.isStopped()) {
this.brokerPreOnlineService.shutdown();
}
@@ -1448,10 +1449,6 @@ public class BrokerController {
this.ackMessageProcessor.startPopReviveService();
}
- if (this.assignmentManager != null) {
- this.assignmentManager.start();
- }
-
if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}
@@ -1484,6 +1481,10 @@ public class BrokerController {
this.escapeBridge.start();
}
+ if (this.topicRouteInfoManager != null) {
+ this.topicRouteInfoManager.start();
+ }
+
if (this.brokerPreOnlineService != null) {
this.brokerPreOnlineService.start();
}
@@ -1779,16 +1780,6 @@ public class BrokerController {
return needRegister;
}
- public String getNameServerList() {
- if (this.brokerConfig.getNamesrvAddr() != null) {
-
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
- return this.brokerConfig.getNamesrvAddr();
- } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
- return this.brokerOuterAPI.fetchNameServerAddr();
- }
- return null;
- }
-
public void startService(long minBrokerId, String minBrokerAddr) {
BrokerController.LOG.info("{} start service, min broker id is {}, min
broker addr: {}",
this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
@@ -2153,14 +2144,6 @@ public class BrokerController {
return sendMessageExecutor;
}
- public AssignmentManager getAssignmentManager() {
- return assignmentManager;
- }
-
- public ClientManageProcessor getClientManageProcessor() {
- return clientManageProcessor;
- }
-
public SendMessageProcessor getSendMessageProcessor() {
return sendMessageProcessor;
}
@@ -2253,4 +2236,8 @@ public class BrokerController {
return timerCheckpoint;
}
+ public TopicRouteInfoManager getTopicRouteInfoManager() {
+ return this.topicRouteInfoManager;
+ }
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 22a72a888..60860db74 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -20,38 +20,47 @@ package org.apache.rocketmq.broker.failover;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MessageQueueSelector;
-import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
public class EscapeBridge {
protected static final InternalLogger LOG =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long SEND_TIMEOUT = 3000L;
+ private static final long DEFAULT_PULL_TIMEOUT_MILLIS = 1000 * 10L;
private final String innerProducerGroupName;
private final String innerConsumerGroupName;
private final BrokerController brokerController;
- private DefaultMQProducer innerProducer;
- private DefaultMQPullConsumer innerConsumer;
+ private ExecutorService defaultAsyncSenderExecutor;
public EscapeBridge(BrokerController brokerController) {
this.brokerController = brokerController;
@@ -61,46 +70,22 @@ public class EscapeBridge {
public void start() throws Exception {
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() &&
brokerController.getBrokerConfig().isEnableRemoteEscape()) {
- String nameserver = this.brokerController.getNameServerList();
- if (nameserver != null && !nameserver.isEmpty()) {
- startInnerProducer(nameserver);
- startInnerConsumer(nameserver);
- LOG.info("start inner producer and consumer success.");
- } else {
- throw new RuntimeException("nameserver address is null or
empty");
- }
+ final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new
LinkedBlockingQueue<>(50000);
+ this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ asyncSenderThreadPoolQueue,
+ new ThreadFactoryImpl("AsyncEscapeBridgeExecutor_",
this.brokerController.getBrokerIdentity())
+ );
+ LOG.info("init executor for escaping messages asynchronously
success.");
}
}
public void shutdown() {
- if (this.innerProducer != null) {
- this.innerProducer.shutdown();
- }
-
- if (this.innerConsumer != null) {
- this.innerConsumer.shutdown();
- }
- }
-
- private void startInnerProducer(String nameServer) throws
MQClientException {
- try {
- innerProducer = new DefaultMQProducer(innerProducerGroupName);
- innerProducer.setNamesrvAddr(nameServer);
- innerProducer.start();
- } catch (MQClientException e) {
- LOG.error("start inner producer failed, nameserver address: {}",
nameServer, e);
- throw e;
- }
- }
-
- private void startInnerConsumer(String nameServer) throws
MQClientException {
- try {
- innerConsumer = new DefaultMQPullConsumer(innerConsumerGroupName);
- innerConsumer.setNamesrvAddr(nameServer);
- innerConsumer.start();
- } catch (MQClientException e) {
- LOG.error("start inner consumer failed, nameserver address: {}",
nameServer, e);
- throw e;
+ if (null != this.defaultAsyncSenderExecutor) {
+ this.defaultAsyncSenderExecutor.shutdown();
}
}
@@ -109,12 +94,11 @@ public class EscapeBridge {
if (masterBroker != null) {
return masterBroker.getMessageStore().putMessage(messageExt);
} else if
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
- && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
- && innerProducer != null) {
- // Remote Acting lead to born timestamp, msgId changed, it needs
to polish.
+ && this.brokerController.getBrokerConfig().isEnableRemoteEscape())
{
+
try {
messageExt.setWaitStoreMsgOK(false);
- SendResult sendResult = innerProducer.send(messageExt);
+ final SendResult sendResult =
putMessageToRemoteBroker(messageExt);
return transformSendResult2PutResult(sendResult);
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
@@ -127,29 +111,73 @@ public class EscapeBridge {
}
}
+ private SendResult putMessageToRemoteBroker(MessageExtBrokerInner
messageExt) {
+ final TopicPublishInfo topicPublishInfo =
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+ if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+ LOG.warn("putMessageToRemoteBroker: no route info of topic {} when
escaping message, msgId={}",
+ messageExt.getTopic(), messageExt.getMsgId());
+ return null;
+ }
+
+ final MessageQueue mqSelected =
topicPublishInfo.selectOneMessageQueue();
+ messageExt.setQueueId(mqSelected.getQueueId());
+
+ final String brokerNameToSend = mqSelected.getBrokerName();
+ final String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+
+ final long beginTimestamp = System.currentTimeMillis();
+ try {
+ final SendResult sendResult =
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+ brokerAddrToSend, brokerNameToSend,
+ messageExt, this.getProducerGroup(messageExt), SEND_TIMEOUT);
+ if (null != sendResult &&
SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+ return sendResult;
+ } else {
+ LOG.error("Escaping failed! cost {}ms, Topic: {}, MsgId: {},
Broker: {}",
+ System.currentTimeMillis() - beginTimestamp,
messageExt.getTopic(),
+ messageExt.getMsgId(), brokerNameToSend);
+ }
+ } catch (RemotingException | MQBrokerException e) {
+ LOG.error(String.format("putMessageToRemoteBroker exception,
MsgId: %s, RT: %sms, Broker: %s",
+ messageExt.getMsgId(), System.currentTimeMillis() -
beginTimestamp, mqSelected), e);
+ } catch (InterruptedException e) {
+ LOG.error(String.format("putMessageToRemoteBroker interrupted,
MsgId: %s, RT: %sms, Broker: %s",
+ messageExt.getMsgId(), System.currentTimeMillis() -
beginTimestamp, mqSelected), e);
+ Thread.currentThread().interrupt();
+ }
+
+ return null;
+ }
+
public CompletableFuture<PutMessageResult>
asyncPutMessage(MessageExtBrokerInner messageExt) {
BrokerController masterBroker =
this.brokerController.peekMasterBroker();
- CompletableFuture<PutMessageResult> completableFuture = new
CompletableFuture<>();
if (masterBroker != null) {
return masterBroker.getMessageStore().asyncPutMessage(messageExt);
} else if
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
- && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
- && innerProducer != null) {
- // Remote Acting lead to born timestamp, msgId changed, it needs
to polish.
+ && this.brokerController.getBrokerConfig().isEnableRemoteEscape())
{
try {
messageExt.setWaitStoreMsgOK(false);
- innerProducer.send(messageExt, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
-
completableFuture.complete(transformSendResult2PutResult(sendResult));
- }
- @Override
- public void onException(Throwable e) {
- completableFuture.complete(new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
- }
- });
- return completableFuture;
+ final TopicPublishInfo topicPublishInfo =
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+ final String producerGroup = getProducerGroup(messageExt);
+
+ final MessageQueue mqSelected =
topicPublishInfo.selectOneMessageQueue();
+ messageExt.setQueueId(mqSelected.getQueueId());
+
+ final String brokerNameToSend = mqSelected.getBrokerName();
+ final String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+ final CompletableFuture<SendResult> future =
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(brokerAddrToSend,
+ brokerNameToSend, messageExt,
+ producerGroup, SEND_TIMEOUT);
+
+ return future.exceptionally(throwable -> null)
+ .thenApplyAsync(sendResult -> {
+ return transformSendResult2PutResult(sendResult);
+ }, this.defaultAsyncSenderExecutor)
+ .exceptionally(throwable -> {
+ return transformSendResult2PutResult(null);
+ });
+
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
@@ -161,24 +189,47 @@ public class EscapeBridge {
}
}
+
+ private String getProducerGroup(MessageExtBrokerInner messageExt) {
+ if (null == messageExt) {
+ return this.innerProducerGroupName;
+ }
+ String producerGroup =
messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+ if (StringUtils.isEmpty(producerGroup)) {
+ producerGroup = this.innerProducerGroupName;
+ }
+ return producerGroup;
+ }
+
+
public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner
messageExt) {
BrokerController masterBroker =
this.brokerController.peekMasterBroker();
if (masterBroker != null) {
return masterBroker.getMessageStore().putMessage(messageExt);
} else if
(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
- && this.brokerController.getBrokerConfig().isEnableRemoteEscape()
- && this.innerProducer != null) {
+ && this.brokerController.getBrokerConfig().isEnableRemoteEscape())
{
try {
messageExt.setWaitStoreMsgOK(false);
- // Remote Acting lead to born timestamp, msgId changed, it
needs to polish.
- SendResult sendResult = innerProducer.send(messageExt, new
MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message
msg, Object arg) {
- String id = (String) arg;
- int index = Math.abs(id.hashCode()) % mqs.size();
- return mqs.get(index);
- }
- }, messageExt.getTopic() + messageExt.getStoreHost());
+
+ final TopicPublishInfo topicPublishInfo =
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+ List<MessageQueue> mqs =
topicPublishInfo.getMessageQueueList();
+
+ if (null == mqs || mqs.isEmpty()) {
+ return new
PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
+ }
+
+ String id = messageExt.getTopic() + messageExt.getStoreHost();
+ final int index = Math.floorMod(id.hashCode(), mqs.size());
+
+ MessageQueue mq = mqs.get(index);
+ messageExt.setQueueId(mq.getQueueId());
+
+ String brokerNameToSend = mq.getBrokerName();
+ String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+ final SendResult sendResult =
this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
+ brokerAddrToSend, brokerNameToSend,
+ messageExt, this.getProducerGroup(messageExt),
SEND_TIMEOUT);
+
return transformSendResult2PutResult(sendResult);
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
@@ -224,10 +275,8 @@ public class EscapeBridge {
} else {
return list.get(0);
}
- } else if (innerConsumer != null) {
- return getMessageFromRemote(topic, offset, queueId, brokerName);
} else {
- return null;
+ return getMessageFromRemote(topic, offset, queueId, brokerName);
}
}
@@ -261,7 +310,20 @@ public class EscapeBridge {
protected MessageExt getMessageFromRemote(String topic, long offset, int
queueId, String brokerName) {
try {
- PullResult pullResult = innerConsumer.pull(new MessageQueue(topic,
brokerName, queueId), "*", offset, 1);
+ String brokerAddr =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
MixAll.MASTER_ID, false);
+ if (null == brokerAddr) {
+
this.brokerController.getTopicRouteInfoManager().updateTopicRouteInfoFromNameServer(topic,
true, false);
+ brokerAddr =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName,
MixAll.MASTER_ID, false);
+
+ if (null == brokerAddr) {
+ LOG.warn("can't find broker address for topic {}", topic);
+ return null;
+ }
+ }
+
+ PullResult pullResult =
this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBroker(brokerName,
+ brokerAddr, this.innerConsumerGroupName, topic, queueId,
offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS);
+
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
return pullResult.getMsgFoundList().get(0);
}
@@ -271,4 +333,5 @@ public class EscapeBridge {
return null;
}
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
deleted file mode 100644
index 606de86d1..000000000
---
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.broker.loadbalance;
-
-import com.google.common.collect.Lists;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-
-public class AssignmentManager {
- private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- private transient BrokerController brokerController;
-
- private final ConcurrentHashMap<String, Set<MessageQueue>>
topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
-
- private ScheduledExecutorService scheduledExecutorService;
-
- private static final List<String> IGNORE_ROUTE_TOPICS = Lists.newArrayList(
- MixAll.CID_RMQ_SYS_PREFIX,
- MixAll.DEFAULT_CONSUMER_GROUP,
- MixAll.TOOLS_CONSUMER_GROUP,
- MixAll.FILTERSRV_CONSUMER_GROUP,
- MixAll.MONITOR_CONSUMER_GROUP,
- MixAll.ONS_HTTP_PROXY_GROUP,
- MixAll.CID_ONSAPI_PERMISSION_GROUP,
- MixAll.CID_ONSAPI_OWNER_GROUP,
- MixAll.CID_ONSAPI_PULL_GROUP
- );
-
- private final List<String> ignoreRouteTopics =
Lists.newArrayList(IGNORE_ROUTE_TOPICS);
-
- public AssignmentManager(BrokerController brokerController) {
- this.brokerController = brokerController;
-
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerClusterName());
-
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerName());
- scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("LoadBalanceManagerScheduledThread",
brokerController.getBrokerIdentity()));
- }
-
- public void start() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- updateTopicRouteInfoFromNameServer();
- } catch (Exception e) {
- log.error("ScheduledTask: failed to pull TopicRouteData
from NameServer", e);
- }
- }
- }, 1000,
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(),
TimeUnit.MILLISECONDS);
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
- public void updateTopicRouteInfoFromNameServer() {
- Set<String> topicList = new
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
- LOOP:
- for (String topic : topicList) {
- for (String keyword : ignoreRouteTopics) {
- if (topic.contains(keyword) ||
TopicValidator.isSystemTopic(topic)) {
- continue LOOP;
- }
- }
-
- this.updateTopicRouteInfoFromNameServer(topic);
- }
- }
-
- public boolean updateTopicRouteInfoFromNameServer(final String topic) {
- try {
- TopicRouteData topicRouteData =
this.brokerController.getBrokerOuterAPI().getTopicRouteInfoFromNameServer(topic,
1000 * 3);
- if (topicRouteData != null) {
- topicRouteData.setTopicQueueMappingByBroker(null);
- Set<MessageQueue> newSubscribeInfo =
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
- Set<MessageQueue> oldSubscribeInfo =
topicSubscribeInfoTable.get(topic);
- boolean changed = !newSubscribeInfo.equals(oldSubscribeInfo);
-
- if (changed) {
- log.info("the topic[{}] subscribe message queue changed,
old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
- topicSubscribeInfoTable.put(topic, newSubscribeInfo);
- return true;
- }
- } else {
- log.warn("updateTopicRouteInfoFromNameServer,
getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
- }
- } catch (Exception e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("updateTopicRouteInfoFromNameServer Exception", e);
- if (e instanceof MQBrokerException &&
ResponseCode.TOPIC_NOT_EXIST == ((MQBrokerException) e).getResponseCode()) {
- // clean no used topic
- cleanNoneRouteTopic(topic);
- }
- }
- }
- return false;
- }
-
- private void cleanNoneRouteTopic(String topic) {
- // clean no used topic
- topicSubscribeInfoTable.remove(topic);
- }
-
- public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
- return topicSubscribeInfoTable.get(topic);
- }
-}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 202fcbfdb..20605a7da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,13 +23,17 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -43,7 +47,9 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
@@ -76,6 +82,8 @@ import
org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
@@ -95,11 +103,13 @@ import
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBro
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientImpl;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -866,6 +876,51 @@ public class BrokerOuterAPI {
final MessageExt msg, String
group,
long timeoutMillis) throws
RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = buildSendMessageRequest(msg, group);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
+ return this.processSendResponse(brokerName, msg, response);
+ }
+
+ public CompletableFuture<SendResult>
sendMessageToSpecificBrokerAsync(String brokerAddr, final String brokerName,
+
final MessageExt msg, String group,
+ long
timeoutMillis) {
+ RemotingCommand request = buildSendMessageRequest(msg, group);
+
+ CompletableFuture<SendResult> cf = new CompletableFuture<>();
+ final String msgId = msg.getMsgId();
+ try {
+ this.remotingClient.invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null != response) {
+ SendResult sendResult = null;
+ try {
+ sendResult = this.processSendResponse(brokerName, msg,
response);
+ cf.complete(sendResult);
+ } catch (MQBrokerException | RemotingCommandException e) {
+ LOGGER.error("processSendResponse in
sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
+ cf.completeExceptionally(e);
+ }
+ } else {
+ cf.complete(null);
+ }
+
+ });
+ } catch (Throwable t) {
+ LOGGER.error("invokeAsync failed in
sendMessageToSpecificBrokerAsync, msgId=" + msgId, t);
+ cf.completeExceptionally(t);
+ }
+ return cf;
+ }
+
+ private static RemotingCommand buildSendMessageRequest(MessageExt msg,
String group) {
+ SendMessageRequestHeaderV2 requestHeaderV2 =
buildSendMessageRequestHeaderV2(msg, group);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2,
requestHeaderV2);
+
+ request.setBody(msg.getBody());
+ return request;
+ }
+
+ private static SendMessageRequestHeaderV2
buildSendMessageRequestHeaderV2(MessageExt msg, String group) {
SendMessageRequestHeader requestHeader = new
SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
requestHeader.setTopic(msg.getTopic());
@@ -880,13 +935,7 @@ public class BrokerOuterAPI {
requestHeader.setBatch(false);
SendMessageRequestHeaderV2 requestHeaderV2 =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2,
requestHeaderV2);
-
- request.setBody(msg.getBody());
-
- RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
-
- return this.processSendResponse(brokerName, msg, response);
+ return requestHeaderV2;
}
private SendResult processSendResponse(
@@ -1166,4 +1215,98 @@ public class BrokerOuterAPI {
});
}
+ public PullResult pullMessageFromSpecificBroker(String brokerName, String
brokerAddr,
+ String consumerGroup,
String topic, int queueId, long offset,
+ int maxNums,
+ long timeoutMillis) throws
MQBrokerException, RemotingException, InterruptedException {
+
+ PullMessageRequestHeader requestHeader = new
PullMessageRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setQueueOffset(offset);
+ requestHeader.setMaxMsgNums(maxNums);
+ requestHeader.setSysFlag(PullSysFlag.buildSysFlag(false, false, true,
false));
+ requestHeader.setCommitOffset(0L);
+ requestHeader.setSuspendTimeoutMillis(10_0000L);
+ requestHeader.setSubscription(SubscriptionData.SUB_ALL);
+ requestHeader.setSubVersion(System.currentTimeMillis());
+ requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
+ requestHeader.setExpressionType(ExpressionType.TAG);
+ requestHeader.setBname(brokerName);
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+ RemotingCommand response = this.remotingClient.invokeSync(brokerAddr,
request, timeoutMillis);
+ PullResultExt pullResultExt = this.processPullResponse(response,
brokerAddr);
+ this.processPullResult(pullResultExt, brokerName, queueId);
+ return pullResultExt;
+ }
+
+ private PullResultExt processPullResponse(
+ final RemotingCommand response,
+ final String addr) throws MQBrokerException,
RemotingCommandException {
+ PullStatus pullStatus = PullStatus.NO_NEW_MSG;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+ pullStatus = PullStatus.FOUND;
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+ pullStatus = PullStatus.NO_NEW_MSG;
+ break;
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ pullStatus = PullStatus.NO_MATCHED_MSG;
+ break;
+ case ResponseCode.PULL_OFFSET_MOVED:
+ pullStatus = PullStatus.OFFSET_ILLEGAL;
+ break;
+
+ default:
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), addr);
+ }
+
+ PullMessageResponseHeader responseHeader =
+ (PullMessageResponseHeader)
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+ return new PullResultExt(pullStatus,
responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
+ responseHeader.getMaxOffset(), null,
responseHeader.getSuggestWhichBrokerId(), response.getBody(),
responseHeader.getOffsetDelta());
+
+ }
+
+ private PullResult processPullResult(final PullResultExt pullResult,
String brokerName, int queueId) {
+
+ if (PullStatus.FOUND == pullResult.getPullStatus()) {
+ ByteBuffer byteBuffer =
ByteBuffer.wrap(pullResult.getMessageBinary());
+ List<MessageExt> msgList = MessageDecoder.decodesBatch(
+ byteBuffer,
+ true,
+ true,
+ true
+ );
+
+ // Currently batch messages are not supported
+ for (MessageExt msg : msgList) {
+ String traFlag =
msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ if (Boolean.parseBoolean(traFlag)) {
+
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ }
+ MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_MIN_OFFSET,
+ Long.toString(pullResult.getMinOffset()));
+ MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_MAX_OFFSET,
+ Long.toString(pullResult.getMaxOffset()));
+ msg.setBrokerName(brokerName);
+ msg.setQueueId(queueId);
+ if (pullResult.getOffsetDelta() != null) {
+ msg.setQueueOffset(pullResult.getOffsetDelta() +
msg.getQueueOffset());
+ }
+ }
+
+ pullResult.setMsgFoundList(msgList);
+ }
+
+ pullResult.setMessageBinary(null);
+
+ return pullResult;
+ }
+
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index 7173606f3..01fcc773f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -27,8 +27,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
@@ -163,18 +163,18 @@ public class QueryAssignmentProcessor implements
NettyRequestProcessor {
final MessageModel messageModel, final String strategyName,
SetMessageRequestModeRequestBody setMessageRequestModeRequestBody,
final ChannelHandlerContext ctx) {
Set<MessageQueue> assignedQueueSet = null;
- AssignmentManager assignmentManager =
brokerController.getAssignmentManager();
+ final TopicRouteInfoManager topicRouteInfoManager =
this.brokerController.getTopicRouteInfoManager();
switch (messageModel) {
case BROADCASTING: {
- assignedQueueSet =
assignmentManager.getTopicSubscribeInfo(topic);
+ assignedQueueSet =
topicRouteInfoManager.getTopicSubscribeInfo(topic);
if (assignedQueueSet == null) {
log.warn("QueryLoad: no assignment for group[{}], the
topic[{}] does not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
- Set<MessageQueue> mqSet =
assignmentManager.getTopicSubscribeInfo(topic);
+ Set<MessageQueue> mqSet =
topicRouteInfoManager.getTopicSubscribeInfo(topic);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("QueryLoad: no assignment for group[{}], the
topic[{}] does not exist.", consumerGroup, topic);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
new file mode 100644
index 000000000..a98a228ec
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicRouteInfoManager {
+
+ private static final long GET_TOPIC_ROUTE_TIMEOUT = 3000L;
+ private static final long LOCK_TIMEOUT_MILLIS = 3000L;
+ private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final Lock lockNamesrv = new ReentrantLock();
+ private final ConcurrentMap<String/* Topic */, TopicRouteData>
topicRouteTable = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String/* Broker Name */, HashMap<Long/*
brokerId */, String/* address */>> brokerAddrTable =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String/* topic */, TopicPublishInfo>
topicPublishInfoTable = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<String, Set<MessageQueue>>
topicSubscribeInfoTable = new ConcurrentHashMap<>();
+
+ private ScheduledExecutorService scheduledExecutorService;
+ private BrokerController brokerController;
+
+ public TopicRouteInfoManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void start() {
+ this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TopicRouteInfoManagerScheduledThread");
+ }
+ });
+
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ updateTopicRouteInfoFromNameServer();
+ } catch (Exception e) {
+ log.error("ScheduledTask: failed to pull TopicRouteData from
NameServer", e);
+ }
+ }, 1000,
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(),
TimeUnit.MILLISECONDS);
+ }
+
+ private void updateTopicRouteInfoFromNameServer() {
+ final Set<String> topicSetForPopAssignment =
this.topicSubscribeInfoTable.keySet();
+ final Set<String> topicSetForEscapeBridge =
this.topicRouteTable.keySet();
+ final Set<String> topicsAll = Sets.union(topicSetForPopAssignment,
topicSetForEscapeBridge);
+
+ for (String topic : topicsAll) {
+ boolean isNeedUpdatePublishInfo =
topicSetForEscapeBridge.contains(topic);
+ boolean isNeedUpdateSubscribeInfo =
topicSetForPopAssignment.contains(topic);
+ updateTopicRouteInfoFromNameServer(topic, isNeedUpdatePublishInfo,
isNeedUpdateSubscribeInfo);
+ }
+ }
+
+ public void updateTopicRouteInfoFromNameServer(String topic, boolean
isNeedUpdatePublishInfo,
+ boolean isNeedUpdateSubscribeInfo) {
+ try {
+ if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS)) {
+ try {
+ final TopicRouteData topicRouteData =
this.brokerController.getBrokerOuterAPI()
+ .getTopicRouteInfoFromNameServer(topic,
GET_TOPIC_ROUTE_TIMEOUT);
+ if (null == topicRouteData) {
+ log.warn("TopicRouteInfoManager:
updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return
null, Topic: {}.", topic);
+ return;
+ }
+
+ if (isNeedUpdateSubscribeInfo) {
+ this.updateSubscribeInfoTable(topicRouteData, topic);
+ }
+
+ if (isNeedUpdatePublishInfo) {
+ this.updateTopicRouteTable(topic, topicRouteData);
+ }
+ } catch (RemotingException e) {
+ log.error("updateTopicRouteInfoFromNameServer Exception",
e);
+ } catch (MQBrokerException e) {
+ log.error("updateTopicRouteInfoFromNameServer Exception",
e);
+ if (!NamespaceUtil.isRetryTopic(topic)
+ && ResponseCode.TOPIC_NOT_EXIST ==
e.getResponseCode()) {
+ // clean no used topic
+ cleanNoneRouteTopic(topic);
+ }
+ } finally {
+ this.lockNamesrv.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.warn("updateTopicRouteInfoFromNameServer Exception", e);
+ }
+ }
+
+ private boolean updateTopicRouteTable(String topic, TopicRouteData
topicRouteData) {
+ TopicRouteData old = this.topicRouteTable.get(topic);
+ boolean changed = this.topicRouteDataIsChange(old, topicRouteData);
+ if (!changed) {
+ if (!this.isNeedUpdateTopicRouteInfo(topic)) {
+ return false;
+ }
+ } else {
+ log.info("the topic[{}] route info changed, old[{}] ,new[{}]",
topic, old, topicRouteData);
+ }
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
+ }
+
+ TopicPublishInfo publishInfo =
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+ publishInfo.setHaveTopicRouterInfo(true);
+ this.updateTopicPublishInfo(topic, publishInfo);
+
+ TopicRouteData cloneTopicRouteData = new
TopicRouteData(topicRouteData);
+ log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic,
cloneTopicRouteData);
+ this.topicRouteTable.put(topic, cloneTopicRouteData);
+
+ return true;
+ }
+
+ private boolean updateSubscribeInfoTable(TopicRouteData topicRouteData,
String topic) {
+ final TopicRouteData tmp = new TopicRouteData(topicRouteData);
+ tmp.setTopicQueueMappingByBroker(null);
+ Set<MessageQueue> newSubscribeInfo =
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, tmp);
+ Set<MessageQueue> oldSubscribeInfo =
topicSubscribeInfoTable.get(topic);
+
+ if (Objects.equals(newSubscribeInfo, oldSubscribeInfo)) {
+ return false;
+ }
+
+ log.info("the topic[{}] subscribe message queue changed, old[{}]
,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
+ topicSubscribeInfoTable.put(topic, newSubscribeInfo);
+ return true;
+
+ }
+
+ private boolean isNeedUpdateTopicRouteInfo(final String topic) {
+ final TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
+ return null == prev || !prev.ok();
+ }
+
+ private boolean topicRouteDataIsChange(TopicRouteData olddata,
TopicRouteData nowdata) {
+ if (olddata == null || nowdata == null)
+ return true;
+ TopicRouteData old = new TopicRouteData(olddata);
+ TopicRouteData now = new TopicRouteData(nowdata);
+ Collections.sort(old.getQueueDatas());
+ Collections.sort(old.getBrokerDatas());
+ Collections.sort(now.getQueueDatas());
+ Collections.sort(now.getBrokerDatas());
+ return !old.equals(now);
+
+ }
+
+ private void cleanNoneRouteTopic(String topic) {
+ // clean no used topic
+ topicSubscribeInfoTable.remove(topic);
+ }
+
+ private void updateTopicPublishInfo(final String topic, final
TopicPublishInfo info) {
+ if (info != null && topic != null) {
+ TopicPublishInfo prev = this.topicPublishInfoTable.put(topic,
info);
+ if (prev != null) {
+ log.info("updateTopicPublishInfo prev is not null, " + prev);
+ }
+ }
+ }
+
+ public void shutdown() {
+ if (null != this.scheduledExecutorService) {
+ this.scheduledExecutorService.shutdown();
+ }
+ }
+
+ public TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
+ TopicPublishInfo topicPublishInfo =
this.topicPublishInfoTable.get(topic);
+ if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+ this.updateTopicRouteInfoFromNameServer(topic, true, false);
+ topicPublishInfo = this.topicPublishInfoTable.get(topic);
+ }
+ return topicPublishInfo;
+ }
+
+ public String findBrokerAddressInPublish(String brokerName) {
+ if (brokerName == null) {
+ return null;
+ }
+ Map<Long/* brokerId */, String/* address */> map =
this.brokerAddrTable.get(brokerName);
+ if (map != null && !map.isEmpty()) {
+ return map.get(MixAll.MASTER_ID);
+ }
+
+ return null;
+ }
+
+ public String findBrokerAddressInSubscribe(
+ final String brokerName,
+ final long brokerId,
+ final boolean onlyThisBroker
+ ) {
+ if (brokerName == null) {
+ return null;
+ }
+ String brokerAddr = null;
+ boolean found = false;
+
+ Map<Long/* brokerId */, String/* address */> map =
this.brokerAddrTable.get(brokerName);
+ if (map != null && !map.isEmpty()) {
+ brokerAddr = map.get(brokerId);
+ boolean slave = brokerId != MixAll.MASTER_ID;
+ found = brokerAddr != null;
+
+ if (!found && slave) {
+ brokerAddr = map.get(brokerId + 1);
+ found = brokerAddr != null;
+ }
+
+ if (!found && !onlyThisBroker) {
+ Map.Entry<Long, String> entry =
map.entrySet().iterator().next();
+ brokerAddr = entry.getValue();
+ found = true;
+ }
+ }
+
+ return brokerAddr;
+
+ }
+
+ public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
+ Set<MessageQueue> queues = topicSubscribeInfoTable.get(topic);
+ if (null == queues || queues.isEmpty()) {
+ this.updateTopicRouteInfoFromNameServer(topic, false, true);
+ queues = this.topicSubscribeInfoTable.get(topic);
+ }
+ return queues;
+ }
+
+
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index 685e85db1..a51e54209 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -64,8 +64,6 @@ public class EscapeBridgeTest {
private static final String BROKER_NAME = "broker_a";
- private static final String NAMESERVER_ADDR = "127.0.0.1:9876";
-
private static final String TEST_TOPIC = "TEST_TOPIC";
private static final int DEFAULT_QUEUE_ID = 0;
@@ -81,7 +79,6 @@ public class EscapeBridgeTest {
escapeBridge = new EscapeBridge(brokerController);
messageExtBrokerInner = new MessageExtBrokerInner();
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
- when(brokerController.getNameServerList()).thenReturn(NAMESERVER_ADDR);
brokerConfig.setEnableSlaveActingMaster(true);
brokerConfig.setEnableRemoteEscape(true);
escapeBridge.start();
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
index 347a24019..0367e3a1e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
-import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
@@ -66,7 +66,7 @@ public class QueryAssignmentProcessorTest {
private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
@Mock
- private AssignmentManager assignmentManager;
+ private TopicRouteInfoManager topicRouteInfoManager;
@Mock
private ChannelHandlerContext handlerContext;
@Mock
@@ -84,8 +84,8 @@ public class QueryAssignmentProcessorTest {
public void init() throws IllegalAccessException, NoSuchFieldException {
clientInfo = new ClientChannelInfo(channel, "127.0.0.1",
LanguageCode.JAVA, 0);
brokerController.setMessageStore(messageStore);
-
doReturn(assignmentManager).when(brokerController).getAssignmentManager();
-
when(assignmentManager.getTopicSubscribeInfo(topic)).thenReturn(ImmutableSet.of(new
MessageQueue(topic, "broker-1", 0), new MessageQueue(topic, "broker-2", 1)));
+
doReturn(topicRouteInfoManager).when(brokerController).getTopicRouteInfoManager();
+
when(topicRouteInfoManager.getTopicSubscribeInfo(topic)).thenReturn(ImmutableSet.of(new
MessageQueue(topic, "broker-1", 0), new MessageQueue(topic, "broker-2", 1)));
queryAssignmentProcessor = new
QueryAssignmentProcessor(brokerController);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
ConsumerData consumerData = createConsumerData(group, topic);
diff --git
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index dd6eeeb61..47edd56e5 100644
---
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -133,17 +133,6 @@ public class InnerBrokerController extends
BrokerController {
return this.brokerConfig.getBrokerIP1() + ":" +
this.brokerConfig.getListenPort();
}
- @Override
- public String getNameServerList() {
- if (this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr()
!= null) {
-
this.brokerContainer.getBrokerOuterAPI().updateNameServerAddressList(brokerContainer.getBrokerContainerConfig().getNamesrvAddr());
- return
this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
- } else if
(this.brokerContainer.getBrokerContainerConfig().isFetchNamesrvAddrByAddressServer())
{
- return
this.brokerContainer.getBrokerOuterAPI().fetchNameServerAddr();
- }
- return null;
- }
-
@Override
public String getHAServerAddr() {
return this.brokerConfig.getBrokerIP2() + ":" +
this.messageStoreConfig.getHaListenPort();
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
index 17d5e0cc3..9c2b35a2b 100644
---
a/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/container/PopSlaveActingMasterIT.java
@@ -168,18 +168,6 @@ public class PopSlaveActingMasterIT extends
ContainerIntegrationTestBase {
@Test
public void testLocalActing_notAckSlave() throws Exception {
-// master1With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-// master1With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//
//master1With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-//
-// master2With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-// master2With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//
//master2With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-//
-// master3With3Replicas.getBrokerConfig().setReviveMaxSlow(0L);
-// master3With3Replicas.getBrokerConfig().setReviveInterval(0L);
-//
//master3With3Replicas.getMessageStoreConfig().setMessageDelayLevel("1s 1s 1s
1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s 1s");
-
String topic = PopSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
createTopic(topic);
String retryTopic = KeyBuilder.buildPopRetryTopic(topic,
CONSUME_GROUP);
@@ -585,4 +573,4 @@ public class PopSlaveActingMasterIT extends
ContainerIntegrationTestBase {
}
-}
\ No newline at end of file
+}