This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 82364fc Polish the tools
82364fc is described below
commit 82364fc28592980c8215edfaf4e20e254c39a861
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 18 15:52:54 2021 +0800
Polish the tools
---
.../common/statictopic/TopicQueueMappingUtils.java | 16 +++++++-
.../MigrateTopicLogicalQueueCommand.java | 2 +-
.../topic/RemappingStaticTopicSubCommand.java | 24 ++++-------
.../command/topic/UpdateStaticTopicSubCommand.java | 47 ++++++++++------------
4 files changed, 45 insertions(+), 44 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 27dcc0e..dfb6bbf 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch,
queueNum);
}
- public static Map<Integer, TopicQueueMappingOne>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace) {
+ public static Map<Integer, TopicQueueMappingOne>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new
Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1,
TopicQueueMappingDetail o2) {
@@ -111,8 +111,12 @@ public class TopicQueueMappingUtils {
}
});
+ int maxNum = 0;
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer,
TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+ if (mappingDetail.totalQueues > maxNum) {
+ maxNum = mappingDetail.totalQueues;
+ }
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>
entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaderBrokerName = getLeaderBroker(entry.getValue());
@@ -129,6 +133,16 @@ public class TopicQueueMappingUtils {
}
}
}
+ if (checkConsistence) {
+ if (maxNum != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d", maxNum,
globalIdMap.size()));
+ }
+ for (int i = 0; i < maxNum; i++) {
+ if (!globalIdMap.containsKey(i)) {
+ throw new RuntimeException(String.format("The queue number
%s is not in globalIdMap", i));
+ }
+ }
+ }
return globalIdMap;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
index 5da8b0d..a3f212c 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
@@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements
SubCommand {
String toBrokerName,
Long forceDelta) throws RemotingException, MQBrokerException,
InterruptedException, SubCommandException, MQClientException {
TopicRouteData topicRouteInfo =
mqAdminExt.examineTopicRouteInfo(topic);
- LogicalQueuesInfo logicalQueuesInfo =
topicRouteInfo.getLogicalQueuesInfo();
+ LogicalQueuesInfo logicalQueuesInfo = null;
/*topicRouteInfo.getLogicalQueuesInfo();*/
if (logicalQueuesInfo == null) {
throw new SubCommandException("topic not enabled logical queue");
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index edae005..472ce0b 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -172,26 +172,15 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
- detailList.forEach( mappingDetail -> {
- if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose not
match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(),
mappingDetail.getBname()));
+ for (TopicQueueMappingDetail mappingDetail : detailList) {
+ if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose not
match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(),
mappingDetail.getBname()));
}
- if (tmpMaxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- });
-
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
-
- if (maxEpochAndNum.getValue() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
- }
- for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
- if (!globalIdMap.containsKey(i)) {
- throw new RuntimeException(String.format("The queue number
%s is not in globalIdMap", i));
+ if (maxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", maxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
}
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
@@ -262,6 +251,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
+ TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())),
false, true);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index d25fb70..65c7377 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -64,7 +64,10 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Option opt = null;
- opt = new Option("c", "clusters", true, "create topic to clusters,
comma separated");
+ opt = new Option("c", "clusters", true, "remapping static topic to
clusters, comma separated");
+ optionGroup.addOption(opt);
+
+ opt = new Option("b", "brokers", true, "remapping static topic to
brokers, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
@@ -94,22 +97,23 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Set<String> brokers = new HashSet<>();
try {
- if (!commandLine.hasOption('t')
- || !commandLine.hasOption('c')
+ if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
+ || !commandLine.hasOption('t')
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " +
this.commandName(), options);
return;
}
- String topic = commandLine.getOptionValue('t').trim();
- int queueNum =
Integer.parseInt(commandLine.getOptionValue("qn").trim());
- String clusters = commandLine.getOptionValue('c').trim();
+
ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
- } else {
- clientMetadata.refreshClusterInfo(clusterInfo);
}
+ clientMetadata.refreshClusterInfo(clusterInfo);
+
+ String topic = commandLine.getOptionValue('t').trim();
+ int queueNum =
Integer.parseInt(commandLine.getOptionValue("qn").trim());
+ String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
@@ -163,26 +167,15 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Long, Integer> tmpMaxEpochAndNum =
maxEpochAndNum;
- detailList.forEach( mappingDetail -> {
- if (tmpMaxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (tmpMaxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
+ for (TopicQueueMappingDetail mappingDetail : detailList) {
+ if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose
not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(),
mappingDetail.getBname()));
}
- });
-
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
-
- if (maxEpochAndNum.getValue() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
- }
- for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
- if (!globalIdMap.containsKey(i)) {
- throw new RuntimeException(String.format("The queue
number %s is not in globalIdMap", i));
+ if (maxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", maxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
}
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false, true);
}
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the
queue num for static topic %d < %d", queueNum, globalIdMap.size()));
@@ -230,10 +223,14 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
}
+
+ //double check the topic config map
existedTopicConfigMap.values().forEach( configMapping -> {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
});
+ TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())),
false, true);
+
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();