This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 295a6bd Polish the update utils
295a6bd is described below
commit 295a6bde63210d73570dcdfc71ccd100a3c87444
Author: dongeforever <[email protected]>
AuthorDate: Wed Nov 17 21:49:27 2021 +0800
Polish the update utils
---
.../rocketmq/common/TopicQueueMappingUtils.java | 111 +++++++++++++++++++++
.../command/topic/UpdateStaticTopicSubCommand.java | 78 +++++++--------
2 files changed, 149 insertions(+), 40 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
new file mode 100644
index 0000000..93345df
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TopicQueueMappingUtils {
+
+ public static class MappingState {
+ Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
+ int currentIndex = 0;
+ Random random = new Random();
+ List<String> leastBrokers = new ArrayList<String>();
+ private MappingState(Map<String, Integer> brokerNumMap) {
+ this.brokerNumMap.putAll(brokerNumMap);
+ }
+
+ public void freshState() {
+ int minNum = -1;
+ for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
+ if (entry.getValue() > minNum) {
+ leastBrokers.clear();
+ leastBrokers.add(entry.getKey());
+ } else if (entry.getValue() == minNum) {
+ leastBrokers.add(entry.getKey());
+ }
+ }
+ currentIndex = random.nextInt(leastBrokers.size());
+ }
+
+ public String nextBroker() {
+ if (leastBrokers.isEmpty()) {
+ freshState();
+ }
+ int tmpIndex = (++currentIndex) % leastBrokers.size();
+ String broker = leastBrokers.remove(tmpIndex);
+ currentIndex--;
+ return broker;
+ }
+ }
+
+ public static MappingState buildMappingState(Map<String, Integer>
brokerNumMap) {
+ return new MappingState(brokerNumMap);
+ }
+
+ public static Map.Entry<Integer, Integer>
findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
+ int epoch = -1;
+ int queueNum = 0;
+ for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+ if (mappingDetail.getEpoch() > epoch) {
+ epoch = mappingDetail.getEpoch();
+ }
+ if (mappingDetail.getTotalQueues() > queueNum) {
+ queueNum = mappingDetail.getTotalQueues();
+ }
+ }
+ return new AbstractMap.SimpleImmutableEntry<Integer, Integer>(epoch,
queueNum);
+ }
+
+ public static Map<Integer, ImmutableList<LogicQueueMappingItem>>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace) {
+ Collections.sort(mappingDetailList, new
Comparator<TopicQueueMappingDetail>() {
+ @Override
+ public int compare(TopicQueueMappingDetail o1,
TopicQueueMappingDetail o2) {
+ return o2.getEpoch() - o1.getEpoch();
+ }
+ });
+
+ Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
+ for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>
entry : mappingDetail.getHostedQueues().entrySet()) {
+ Integer globalid = entry.getKey();
+ String leaerBrokerName =
entry.getValue().iterator().next().getBname();
+ if (!leaerBrokerName.equals(mappingDetail.getBname())) {
+ //not the leader
+ continue;
+ }
+ if (globalIdMap.containsKey(globalid)) {
+ if (!replace) {
+ throw new RuntimeException(String.format("The queue id
is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
+ }
+ } else {
+ globalIdMap.put(globalid, entry.getValue());
+ }
+ }
+ }
+ return globalIdMap;
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 05f2807..253d19c 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -34,9 +35,12 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class UpdateStaticTopicSubCommand implements SubCommand {
@@ -73,7 +77,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
return options;
}
- private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry,
boolean shouldNull) {
+ private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping>
entry, boolean shouldNull) {
if (shouldNull) {
if (entry.getValue().getTopicQueueMappingInfo() != null) {
throw new RuntimeException("Mapping info should be null in
broker " + entry.getKey());
@@ -82,30 +86,9 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
if (entry.getValue().getTopicQueueMappingInfo() == null) {
throw new RuntimeException("Mapping info should not be null in
broker " + entry.getKey());
}
- if
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
{
- throw new RuntimeException(String.format("The broker name is
not equal %s != %s ", entry.getKey(),
entry.getValue().getTopicQueueMappingInfo().getBname()));
- }
}
}
- public void validateQueueMappingInfo(Map<Integer,
ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail
mappingDetail) {
- if (mappingDetail.isDirty()) {
- throw new RuntimeException("The mapping info is dirty in broker "
+ mappingDetail.getBname());
- }
- for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry :
mappingDetail.getHostedQueues().entrySet()) {
- Integer globalid = entry.getKey();
- String leaerBrokerName =
entry.getValue().iterator().next().getBname();
- if (!leaerBrokerName.equals(mappingDetail.getBname())) {
- //not the leader
- continue;
- }
- if (globalIdMap.containsKey(globalid)) {
- throw new RuntimeException(String.format("The queue id is
duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
- } else {
- globalIdMap.put(globalid, entry.getValue());
- }
- }
- }
@Override
public void execute(final CommandLine commandLine, final Options options,
@@ -129,7 +112,6 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
throw new RuntimeException("The Cluster info is null for " +
cluster);
}
clientMetadata.refreshClusterInfo(clusterInfo);
-
//first get the existed topic config and mapping
{
TopicRouteData routeData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -146,29 +128,45 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
}
}
}
-
}
+ // the
{
if (!existedTopicConfigMap.isEmpty()) {
- Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it
= existedTopicConfigMap.entrySet().iterator();
- Map.Entry<String, TopicConfigAndQueueMapping> first =
it.next();
- validate(first, false);
- validateQueueMappingInfo(globalIdMap,
first.getValue().getTopicQueueMappingInfo());
- TopicQueueMappingDetail firstMapping =
first.getValue().getTopicQueueMappingInfo();
- while (it.hasNext()) {
- Map.Entry<String, TopicConfigAndQueueMapping> next =
it.next();
- validate(next, false);
- validateQueueMappingInfo(globalIdMap,
next.getValue().getTopicQueueMappingInfo());
- TopicQueueMappingDetail nextMapping =
next.getValue().getTopicQueueMappingInfo();
- if (firstMapping.getEpoch() !=
nextMapping.getEpoch()) {
- throw new RuntimeException(String.format("epoch
dose not match %d != %d in %s %s", firstMapping.getEpoch(),
nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
+ //make sure it it not null
+ existedTopicConfigMap.entrySet().forEach(entry -> {
+ validateIfNull(entry, false);
+ });
+ //make sure the detail is not dirty
+ existedTopicConfigMap.entrySet().forEach(entry -> {
+ if
(!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname()))
{
+ throw new RuntimeException(String.format("The
broker name is not equal %s != %s ", entry.getKey(),
entry.getValue().getTopicQueueMappingInfo().getBname()));
+ }
+ if
(entry.getValue().getTopicQueueMappingInfo().isDirty()) {
+ throw new RuntimeException("The mapping info is
dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname());
+ }
+ });
+
+ List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
+ //check the epoch and qnum
+ Map.Entry<Integer, Integer> maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+ detailList.forEach( mappingDetail -> {
+ if (maxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch
dose not match %d != %d in %s", maxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
}
- if (firstMapping.getTotalQueues() !=
nextMapping.getTotalQueues()) {
- throw new RuntimeException(String.format("total
queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(),
nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
+ if (maxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total
queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
+ });
+
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
+
+ if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total
queue number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
}
- if (firstMapping.getTotalQueues() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total
queue number in config dose not match the real hosted queues %d != %d",
firstMapping.getTotalQueues(), globalIdMap.size()));
+ for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+ if (!globalIdMap.containsKey(i)) {
+ throw new RuntimeException(String.format("The
queue number %s is not in globalIdMap", i));
+ }
}
}
}