caigy commented on code in PR #4834:
URL: https://github.com/apache/rocketmq/pull/4834#discussion_r951358638
##########
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java:
##########
@@ -64,78 +47,36 @@ public AssignmentManager(BrokerController brokerController)
{
this.brokerController = brokerController;
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerClusterName());
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerName());
- scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("LoadBalanceManagerScheduledThread",
brokerController.getBrokerIdentity()));
}
public void start() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- updateTopicRouteInfoFromNameServer();
- } catch (Exception e) {
- log.error("ScheduledTask: failed to pull TopicRouteData
from NameServer", e);
- }
- }
- }, 1000,
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(),
TimeUnit.MILLISECONDS);
}
public void shutdown() {
- this.scheduledExecutorService.shutdown();
}
- public void updateTopicRouteInfoFromNameServer() {
- Set<String> topicList = new
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
- LOOP:
- for (String topic : topicList) {
- for (String keyword : ignoreRouteTopics) {
- if (topic.contains(keyword) ||
TopicValidator.isSystemTopic(topic)) {
- continue LOOP;
- }
- }
- this.updateTopicRouteInfoFromNameServer(topic);
- }
+ public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
+ return
this.brokerController.getTopicRouteInfoManager().getTopicSubscribeInfo(topic);
}
- public boolean updateTopicRouteInfoFromNameServer(final String topic) {
- try {
- TopicRouteData topicRouteData =
this.brokerController.getBrokerOuterAPI().getTopicRouteInfoFromNameServer(topic,
1000 * 3);
- if (topicRouteData != null) {
- topicRouteData.setTopicQueueMappingByBroker(null);
- Set<MessageQueue> newSubscribeInfo =
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
- Set<MessageQueue> oldSubscribeInfo =
topicSubscribeInfoTable.get(topic);
- boolean changed = !newSubscribeInfo.equals(oldSubscribeInfo);
+ public Set<String> getTopicSetForAssignment() {
+ final Set<String> topicSet = new
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
- if (changed) {
- log.info("the topic[{}] subscribe message queue changed,
old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
- topicSubscribeInfoTable.put(topic, newSubscribeInfo);
- return true;
- }
+ for (Iterator<String> it = topicSet.iterator(); it.hasNext(); ) {
+ String topic = it.next();
+ if (TopicValidator.isSystemTopic(topic)) {
+ it.remove();
} else {
- log.warn("updateTopicRouteInfoFromNameServer,
getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
- }
- } catch (Exception e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("updateTopicRouteInfoFromNameServer Exception", e);
- if (e instanceof MQBrokerException &&
ResponseCode.TOPIC_NOT_EXIST == ((MQBrokerException) e).getResponseCode()) {
Review Comment:
Good idea. I'll remove `AssignmentManager` and make related function lazy
initialized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]