RongtongJin commented on code in PR #4834:
URL: https://github.com/apache/rocketmq/pull/4834#discussion_r950630567
##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -127,29 +112,74 @@ public PutMessageResult putMessage(MessageExtBrokerInner
messageExt) {
}
}
+ private SendResult putMessageToRemoteBroker(MessageExtBrokerInner
messageExt) {
+ final TopicPublishInfo topicPublishInfo =
this.brokerController.getTopicRouteInfoManager().tryToFindTopicPublishInfo(messageExt.getTopic());
+ if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+ LOG.warn("putMessageToRemoteBroker: no route info of topic {} when
escaping message, msgId={}",
+ messageExt.getTopic(), messageExt.getMsgId());
+ return null;
+ }
+
+ final long invokeID = RandomUtils.nextLong(0, Long.MAX_VALUE);
Review Comment:
I wonder what invokeID does here? Can it be removed?
##########
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java:
##########
@@ -64,78 +47,36 @@ public AssignmentManager(BrokerController brokerController)
{
this.brokerController = brokerController;
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerClusterName());
ignoreRouteTopics.add(brokerController.getBrokerConfig().getBrokerName());
- scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("LoadBalanceManagerScheduledThread",
brokerController.getBrokerIdentity()));
}
public void start() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- updateTopicRouteInfoFromNameServer();
- } catch (Exception e) {
- log.error("ScheduledTask: failed to pull TopicRouteData
from NameServer", e);
- }
- }
- }, 1000,
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(),
TimeUnit.MILLISECONDS);
}
public void shutdown() {
- this.scheduledExecutorService.shutdown();
}
- public void updateTopicRouteInfoFromNameServer() {
- Set<String> topicList = new
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
-
- LOOP:
- for (String topic : topicList) {
- for (String keyword : ignoreRouteTopics) {
- if (topic.contains(keyword) ||
TopicValidator.isSystemTopic(topic)) {
- continue LOOP;
- }
- }
- this.updateTopicRouteInfoFromNameServer(topic);
- }
+ public Set<MessageQueue> getTopicSubscribeInfo(String topic) {
+ return
this.brokerController.getTopicRouteInfoManager().getTopicSubscribeInfo(topic);
}
- public boolean updateTopicRouteInfoFromNameServer(final String topic) {
- try {
- TopicRouteData topicRouteData =
this.brokerController.getBrokerOuterAPI().getTopicRouteInfoFromNameServer(topic,
1000 * 3);
- if (topicRouteData != null) {
- topicRouteData.setTopicQueueMappingByBroker(null);
- Set<MessageQueue> newSubscribeInfo =
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
- Set<MessageQueue> oldSubscribeInfo =
topicSubscribeInfoTable.get(topic);
- boolean changed = !newSubscribeInfo.equals(oldSubscribeInfo);
+ public Set<String> getTopicSetForAssignment() {
+ final Set<String> topicSet = new
HashSet<>(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
- if (changed) {
- log.info("the topic[{}] subscribe message queue changed,
old[{}] ,new[{}]", topic, oldSubscribeInfo, newSubscribeInfo);
- topicSubscribeInfoTable.put(topic, newSubscribeInfo);
- return true;
- }
+ for (Iterator<String> it = topicSet.iterator(); it.hasNext(); ) {
+ String topic = it.next();
+ if (TopicValidator.isSystemTopic(topic)) {
+ it.remove();
} else {
- log.warn("updateTopicRouteInfoFromNameServer,
getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
- }
- } catch (Exception e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("updateTopicRouteInfoFromNameServer Exception", e);
- if (e instanceof MQBrokerException &&
ResponseCode.TOPIC_NOT_EXIST == ((MQBrokerException) e).getResponseCode()) {
Review Comment:
How about having AssignmentManager lazy initialize like EscapeBridge instead
of all topics fetching routes?
##########
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicRouteInfoManager {
+
+ private static final long SEND_TIMEOUT = 3000L;
Review Comment:
GET_TOPIC_ROUTE_TIMEOUT is more suitable
##########
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java:
##########
@@ -18,34 +18,17 @@
import com.google.common.collect.Lists;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
public class AssignmentManager {
Review Comment:
IMO, this class can be removed
##########
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicRouteInfoManager {
+
+ private static final long SEND_TIMEOUT = 3000L;
+ private static final long LOCK_TIMEOUT_MILLIS = 3000L;
+ private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final Lock lockNamesrv = new ReentrantLock();
+ private final ConcurrentMap<String/* Topic */, TopicRouteData>
topicRouteTable = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String/* Broker Name */, HashMap<Long/*
brokerId */, String/* address */>> brokerAddrTable =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String/* topic */, TopicPublishInfo>
topicPublishInfoTable = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<String, Set<MessageQueue>>
topicSubscribeInfoTable = new ConcurrentHashMap<>();
+
+ private ScheduledExecutorService scheduledExecutorService;
+ private BrokerController brokerController;
+
+ public TopicRouteInfoManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void start() {
+ this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TopicRouteInfoManagerScheduledThread");
+ }
+ });
+
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ updateTopicRouteInfoFromNameServer();
+ } catch (Exception e) {
+ log.error("ScheduledTask: failed to pull TopicRouteData from
NameServer", e);
+ }
+ }, 1000,
this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(),
TimeUnit.MILLISECONDS);
+ }
+
+ private void updateTopicRouteInfoFromNameServer() {
+ final Set<String> topicSetForAssignmentMgr =
this.brokerController.getAssignmentManager().getTopicSetForAssignment();
+ final Set<String> topicSetForEscapeBridge =
this.topicRouteTable.keySet();
+ final Set<String> topicsAll = Sets.union(topicSetForAssignmentMgr,
topicSetForEscapeBridge);
+
+ for (String topic : topicsAll) {
+ boolean isNeedUpdatePublishInfo =
topicSetForEscapeBridge.contains(topic);
+ boolean isNeedUpdateSubscribeInfo =
topicSetForAssignmentMgr.contains(topic);
Review Comment:
isNeedUpdatePublishInfo and isNeedUpdateSubscribeInfo must be true here, so
it is not necessary to judge.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]