This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new a5af2cf Finish the UpdateStaticTopicSubCommand
a5af2cf is described below
commit a5af2cf519e7c2c2a306cc326926100fc75f57d3
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 23:05:03 2021 +0800
Finish the UpdateStaticTopicSubCommand
---
.../org/apache/rocketmq/common/TopicConfig.java | 6 +
.../common/TopicConfigAndQueueMapping.java | 11 +-
.../rocketmq/common/TopicQueueMappingInfo.java | 11 +-
.../rocketmq/common/TopicQueueMappingUtils.java | 59 +++++--
.../command/topic/UpdateStaticTopicSubCommand.java | 192 +++++++++++++--------
5 files changed, 181 insertions(+), 98 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index c082ba6..ec4d54b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -47,6 +47,12 @@ public class TopicConfig {
this.order = other.order;
}
+ public TopicConfig(String topicName, int readQueueNums, int
writeQueueNums) {
+ this.topicName = topicName;
+ this.readQueueNums = readQueueNums;
+ this.writeQueueNums = writeQueueNums;
+ }
+
public TopicConfig(String topicName, int readQueueNums, int
writeQueueNums, int perm) {
this.topicName = topicName;
this.readQueueNums = readQueueNums;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
index 3bc7f24..e2eece6 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java
@@ -17,18 +17,17 @@
package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping extends TopicConfig {
- private TopicQueueMappingDetail topicQueueMappingDetail;
+ private TopicQueueMappingDetail mappingDetail;
public TopicConfigAndQueueMapping() {
}
- public TopicConfigAndQueueMapping(TopicConfig topicConfig,
TopicQueueMappingDetail topicQueueMappingDetail) {
+ public TopicConfigAndQueueMapping(TopicConfig topicConfig,
TopicQueueMappingDetail mappingDetail) {
super(topicConfig);
- this.topicQueueMappingDetail = topicQueueMappingDetail;
+ this.mappingDetail = mappingDetail;
}
-
- public TopicQueueMappingDetail getTopicQueueMappingInfo() {
- return topicQueueMappingDetail;
+ public TopicQueueMappingDetail getMappingDetail() {
+ return mappingDetail;
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
index b2e8591..b4bf776 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java
@@ -55,9 +55,6 @@ public class TopicQueueMappingInfo extends
RemotingSerializable {
return totalQueues;
}
- public void setTotalQueues(int totalQueues) {
- this.totalQueues = totalQueues;
- }
public String getBname() {
return bname;
@@ -71,6 +68,14 @@ public class TopicQueueMappingInfo extends
RemotingSerializable {
return epoch;
}
+ public void setEpoch(int epoch) {
+ this.epoch = epoch;
+ }
+
+ public void setTotalQueues(int totalQueues) {
+ this.totalQueues = totalQueues;
+ }
+
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 93345df..87c54a3 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -29,16 +29,18 @@ import java.util.Random;
public class TopicQueueMappingUtils {
- public static class MappingState {
+ public static class MappingAllocator {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+ Map<Integer, String> idToBroker = new HashMap<Integer, String>();
int currentIndex = 0;
Random random = new Random();
List<String> leastBrokers = new ArrayList<String>();
- private MappingState(Map<String, Integer> brokerNumMap) {
+ private MappingAllocator(Map<Integer, String> idToBroker, Map<String,
Integer> brokerNumMap) {
+ this.idToBroker.putAll(idToBroker);
this.brokerNumMap.putAll(brokerNumMap);
}
- public void freshState() {
+ private void freshState() {
int minNum = -1;
for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
if (entry.getValue() > minNum) {
@@ -50,20 +52,41 @@ public class TopicQueueMappingUtils {
}
currentIndex = random.nextInt(leastBrokers.size());
}
-
- public String nextBroker() {
+ private String nextBroker() {
if (leastBrokers.isEmpty()) {
freshState();
}
- int tmpIndex = (++currentIndex) % leastBrokers.size();
- String broker = leastBrokers.remove(tmpIndex);
- currentIndex--;
- return broker;
+ int tmpIndex = currentIndex % leastBrokers.size();
+ return leastBrokers.remove(tmpIndex);
+ }
+
+ public Map<String, Integer> getBrokerNumMap() {
+ return brokerNumMap;
+ }
+
+ public void upToNum(int maxQueueNum) {
+ int currSize = idToBroker.size();
+ if (maxQueueNum <= currSize) {
+ return;
+ }
+ for (int i = currSize; i < maxQueueNum; i++) {
+ String nextBroker = nextBroker();
+ if (brokerNumMap.containsKey(nextBroker)) {
+ brokerNumMap.put(nextBroker, brokerNumMap.get(nextBroker)
+ 1);
+ } else {
+ brokerNumMap.put(nextBroker, 1);
+ }
+ idToBroker.put(i, nextBroker);
+ }
+ }
+
+ public Map<Integer, String> getIdToBroker() {
+ return idToBroker;
}
}
- public static MappingState buildMappingState(Map<String, Integer>
brokerNumMap) {
- return new MappingState(brokerNumMap);
+ public static MappingAllocator buildMappingAllocator(Map<Integer, String>
idToBroker, Map<String, Integer> brokerNumMap) {
+ return new MappingAllocator(idToBroker, brokerNumMap);
}
public static Map.Entry<Integer, Integer>
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
@@ -92,14 +115,14 @@ public class TopicQueueMappingUtils {
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>
entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
- String leaerBrokerName =
entry.getValue().iterator().next().getBname();
- if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+ String leaderBrokerName = getLeaderBroker(entry.getValue());
+ if (!leaderBrokerName.equals(mappingDetail.getBname())) {
//not the leader
continue;
}
if (globalIdMap.containsKey(globalid)) {
if (!replace) {
- throw new RuntimeException(String.format("The queue id
is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+ throw new RuntimeException(String.format("The queue id
is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
}
} else {
globalIdMap.put(globalid, entry.getValue());
@@ -108,4 +131,12 @@ public class TopicQueueMappingUtils {
}
return globalIdMap;
}
+
+ public static String getLeaderBroker(ImmutableList<LogicQueueMappingItem>
items) {
+ return getLeaderItem(items).getBname();
+ }
+ public static LogicQueueMappingItem
getLeaderItem(ImmutableList<LogicQueueMappingItem> items) {
+ assert items.size() > 0;
+ return items.get(items.size() - 1);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 253d19c..a1d3f6f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
@@ -35,11 +36,13 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
public class UpdateStaticTopicSubCommand implements SubCommand {
@@ -77,18 +80,6 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
return options;
}
- private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping>
entry, boolean shouldNull) {
- if (shouldNull) {
- if (entry.getValue().getTopicQueueMappingInfo() != null) {
- throw new RuntimeException("Mapping info should be null in
broker " + entry.getKey());
- }
- } else {
- if (entry.getValue().getTopicQueueMappingInfo() == null) {
- throw new RuntimeException("Mapping info should not be null in
broker " + entry.getKey());
- }
- }
- }
-
@Override
public void execute(final CommandLine commandLine, final Options options,
@@ -100,73 +91,88 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new
HashMap<>();
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<>();
try {
-
+ if (!commandLine.hasOption('t')
+ || !commandLine.hasOption('c')
+ || !commandLine.hasOption("qn")) {
+ ServerUtil.printCommandLineHelp("mqadmin " +
this.commandName(), options);
+ return;
+ }
String topic = commandLine.getOptionValue('t').trim();
int queueNum =
Integer.parseInt(commandLine.getOptionValue("qn").trim());
- String cluster = commandLine.getOptionValue('c').trim();
+ String clusters = commandLine.getOptionValue('c').trim();
ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
- || clusterInfo.getClusterAddrTable().isEmpty()
- || clusterInfo.getClusterAddrTable().get(cluster) == null
- ||
clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
- throw new RuntimeException("The Cluster info is null for " +
cluster);
+ || clusterInfo.getClusterAddrTable().isEmpty()) {
+ throw new RuntimeException("The Cluster info is empty");
+ } else {
+ clientMetadata.refreshClusterInfo(clusterInfo);
}
- clientMetadata.refreshClusterInfo(clusterInfo);
- //first get the existed topic config and mapping
- {
- TopicRouteData routeData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
- clientMetadata.freshTopicRoute(topic, routeData);
- if (routeData != null
+ Set<String> brokers = new HashSet<>();
+ for (String cluster : clusters.split(",")) {
+ cluster = cluster.trim();
+ if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
+
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ }
+ }
+ if (brokers.isEmpty()) {
+ throw new RuntimeException("Find none brokers for " +
clusters);
+ }
+
+ //get the existed topic config and mapping
+ TopicRouteData routeData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
+ clientMetadata.freshTopicRoute(topic, routeData);
+ if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
- for (QueueData queueData: routeData.getQueueDatas()) {
- String bname = queueData.getBrokerName();
- String addr =
clientMetadata.findMasterBrokerAddr(bname);
- TopicConfigAndQueueMapping mapping =
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
- //allow the mapping info is null
- if (mapping != null) {
- existedTopicConfigMap.put(bname, mapping);
- }
+ for (QueueData queueData: routeData.getQueueDatas()) {
+ String bname = queueData.getBrokerName();
+ String addr = clientMetadata.findMasterBrokerAddr(bname);
+ TopicConfigAndQueueMapping mapping =
(TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
+ //allow the config is null
+ if (mapping != null) {
+ existedTopicConfigMap.put(bname, mapping);
}
}
}
- // the
- {
- if (!existedTopicConfigMap.isEmpty()) {
- //make sure it it not null
- existedTopicConfigMap.entrySet().forEach(entry -> {
- validateIfNull(entry, false);
- });
- //make sure the detail is not dirty
- existedTopicConfigMap.entrySet().forEach(entry -> {
- if
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
{
- throw new RuntimeException(String.format("The
broker name is not equal %s != %s ", entry.getKey(),
entry.getValue().getTopicQueueMappingInfo().getBname()));
- }
- if
(entry.getValue().getTopicQueueMappingInfo().isDirty()) {
- throw new RuntimeException("The mapping info is
dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname());
- }
- });
-
- List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
- //check the epoch and qnum
- Map.Entry<Integer, Integer> maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- detailList.forEach( mappingDetail -> {
- if (maxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch
dose not match %d != %d in %s", maxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (maxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total
queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- });
-
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
-
- if (maxEpochAndNum.getValue() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total
queue number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
+
+ Map.Entry<Integer, Integer> maxEpochAndNum = new
AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
+ if (!existedTopicConfigMap.isEmpty()) {
+ //make sure it it not null
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (value.getMappingDetail() != null) {
+ throw new RuntimeException("Mapping info should be
null in broker " + key);
+ }
+ });
+ //make sure the detail is not dirty
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (!key.equals(value.getMappingDetail().getBname())) {
+ throw new RuntimeException(String.format("The broker
name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
- for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
- if (!globalIdMap.containsKey(i)) {
- throw new RuntimeException(String.format("The
queue number %s is not in globalIdMap", i));
- }
+ if (value.getMappingDetail().isDirty()) {
+ throw new RuntimeException("The mapping info is dirty
in broker " + value.getMappingDetail().getBname());
+ }
+ });
+
+ List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
+ //check the epoch and qnum
+ maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+ final Map.Entry<Integer, Integer> tmpMaxEpochAndNum =
maxEpochAndNum;
+ detailList.forEach( mappingDetail -> {
+ if (tmpMaxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
+ }
+ if (tmpMaxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+ }
+ });
+
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
+
+ if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
+ }
+ for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+ if (!globalIdMap.containsKey(i)) {
+ throw new RuntimeException(String.format("The queue
number %s is not in globalIdMap", i));
}
}
}
@@ -177,11 +183,47 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the
existed queue num, do nothing");
}
- //the check is ok, now do the real
-
-
-
- ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(),
options);
+ //the check is ok, now do the mapping allocation
+ Map<String, Integer> brokerNumMap =
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
+ Map<Integer, String> idToBroker = new HashMap<>();
+ globalIdMap.forEach((key, value) -> {
+ String leaderbroker =
TopicQueueMappingUtils.getLeaderBroker(value);
+ idToBroker.put(key, leaderbroker);
+ if (!brokerNumMap.containsKey(leaderbroker)) {
+ brokerNumMap.put(leaderbroker, 1);
+ } else {
+ brokerNumMap.put(leaderbroker,
brokerNumMap.get(leaderbroker) + 1);
+ }
+ });
+ TopicQueueMappingUtils.MappingAllocator allocator =
TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
+ allocator.upToNum(queueNum);
+ Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+
+ //construct the topic configAndMapping
+ int epoch = maxEpochAndNum.getKey() + 1;
+ newIdToBroker.forEach( (queueId, broker) -> {
+ TopicConfigAndQueueMapping configMapping;
+ if (!existedTopicConfigMap.containsKey(broker)) {
+ TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
+ TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, queueNum, broker, epoch);
+ configMapping = new
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+ } else {
+ configMapping = existedTopicConfigMap.get(broker);
+
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+ configMapping.getMappingDetail().setEpoch(epoch);
+ configMapping.getMappingDetail().setTotalQueues(queueNum);
+ }
+ LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
+ configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
+ });
+
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail());
+ }
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
} finally {