This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 25a588b Init the remapping command
25a588b is described below
commit 25a588b81fd58dccb1e5f81e8a65859570a76a89
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 18 12:00:15 2021 +0800
Init the remapping command
---
.../rocketmq/common/TopicQueueMappingOne.java | 54 ++++++
.../rocketmq/common/TopicQueueMappingUtils.java | 6 +-
...nd.java => RemappingStaticTopicSubCommand.java} | 192 ++++++++++++---------
.../command/topic/UpdateStaticTopicSubCommand.java | 38 ++--
4 files changed, 190 insertions(+), 100 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
new file mode 100644
index 0000000..150a208
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingOne.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TopicQueueMappingOne extends RemotingSerializable {
+
+ String topic; // redundant field
+ String bname; //identify the hosted broker name
+ Integer globalId;
+ ImmutableList<LogicQueueMappingItem> items;
+
+ public TopicQueueMappingOne(String topic, String bname, Integer globalId,
ImmutableList<LogicQueueMappingItem> items) {
+ this.topic = topic;
+ this.bname = bname;
+ this.globalId = globalId;
+ this.items = items;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBname() {
+ return bname;
+ }
+
+ public Integer getGlobalId() {
+ return globalId;
+ }
+
+ public ImmutableList<LogicQueueMappingItem> getItems() {
+ return items;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
index 686208a..ff89aaf 100644
---
a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingUtils.java
@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch,
queueNum);
}
- public static Map<Integer, ImmutableList<LogicQueueMappingItem>>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace) {
+ public static Map<Integer, TopicQueueMappingOne>
buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean
replace) {
Collections.sort(mappingDetailList, new
Comparator<TopicQueueMappingDetail>() {
@Override
public int compare(TopicQueueMappingDetail o1,
TopicQueueMappingDetail o2) {
@@ -111,7 +111,7 @@ public class TopicQueueMappingUtils {
}
});
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer,
TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>>
entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
@@ -125,7 +125,7 @@ public class TopicQueueMappingUtils {
throw new RuntimeException(String.format("The queue id
is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
}
} else {
- globalIdMap.put(globalid, entry.getValue());
+ globalIdMap.put(globalid, new
TopicQueueMappingOne(mappingDetail.topic, mappingDetail.bname, globalid,
entry.getValue()));
}
}
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
similarity index 52%
copy from
tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
copy to
tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 29a7261..4cc5acf 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
-import com.sun.xml.internal.ws.api.BindingIDFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@@ -26,6 +25,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -38,15 +38,17 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-public class UpdateStaticTopicSubCommand implements SubCommand {
+public class RemappingStaticTopicSubCommand implements SubCommand {
@Override
public String commandName() {
@@ -64,7 +66,10 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Option opt = null;
- opt = new Option("c", "clusterName", true, "create topic to which
cluster");
+ opt = new Option("c", "clusters", true, "remapping static topic to
clusters, comma separated");
+ optionGroup.addOption(opt);
+
+ opt = new Option("b", "brokers", true, "remapping static topic to
brokers, comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
@@ -73,11 +78,6 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
-
- opt = new Option("qn", "totalQueueNum", true, "total queue num");
- opt.setRequired(true);
- options.addOption(opt);
-
return options;
}
@@ -90,33 +90,46 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new
HashMap<>();
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ Map.Entry<Long, Integer> maxEpochAndNum = null;
try {
- if (!commandLine.hasOption('t')
- || !commandLine.hasOption('c')
- || !commandLine.hasOption("qn")) {
+ if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
+ || !commandLine.hasOption('t')) {
ServerUtil.printCommandLineHelp("mqadmin " +
this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
- int queueNum =
Integer.parseInt(commandLine.getOptionValue("qn").trim());
- String clusters = commandLine.getOptionValue('c').trim();
+
ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
- } else {
- clientMetadata.refreshClusterInfo(clusterInfo);
}
- Set<String> brokers = new HashSet<>();
- for (String cluster : clusters.split(",")) {
- cluster = cluster.trim();
- if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
-
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ clientMetadata.refreshClusterInfo(clusterInfo);
+
+ if (commandLine.hasOption("b")) {
+ String brokerStrs = commandLine.getOptionValue("b").trim();
+ for (String broker: brokerStrs.split(",")) {
+ brokers.add(broker.trim());
+ }
+ } else if (commandLine.hasOption("c")) {
+ String clusters = commandLine.getOptionValue('c').trim();
+ for (String cluster : clusters.split(",")) {
+ cluster = cluster.trim();
+ if (clusterInfo.getClusterAddrTable().get(cluster) !=
null) {
+
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
+ }
}
}
if (brokers.isEmpty()) {
- throw new RuntimeException("Find none brokers for " +
clusters);
+ throw new RuntimeException("Find none brokers");
+ }
+ for (String broker : brokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " +
broker);
+ }
}
//get the existed topic config and mapping
@@ -135,93 +148,106 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
}
}
- Map.Entry<Long, Integer> maxEpochAndNum = new
AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), queueNum);
- if (!existedTopicConfigMap.isEmpty()) {
- //make sure it it not null
- existedTopicConfigMap.forEach((key, value) -> {
- if (value.getMappingDetail() != null) {
- throw new RuntimeException("Mapping info should be
null in broker " + key);
- }
- });
- //make sure the detail is not dirty
- existedTopicConfigMap.forEach((key, value) -> {
- if (!key.equals(value.getMappingDetail().getBname())) {
- throw new RuntimeException(String.format("The broker
name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
- }
- if (value.getMappingDetail().isDirty()) {
- throw new RuntimeException("The mapping info is dirty
in broker " + value.getMappingDetail().getBname());
- }
- });
-
- List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
- //check the epoch and qnum
- maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
- final Map.Entry<Long, Integer> tmpMaxEpochAndNum =
maxEpochAndNum;
- detailList.forEach( mappingDetail -> {
- if (tmpMaxEpochAndNum.getKey() !=
mappingDetail.getEpoch()) {
- throw new RuntimeException(String.format("epoch dose
not match %d != %d in %s", tmpMaxEpochAndNum.getKey(),
mappingDetail.getEpoch(), mappingDetail.getBname()));
- }
- if (tmpMaxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
- throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
- }
- });
+ if (existedTopicConfigMap.isEmpty()) {
+ throw new RuntimeException("No topic route to do the
remapping");
+ }
- globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
+ //make sure it it not null
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (value.getMappingDetail() != null) {
+ throw new RuntimeException("Mapping info should be null in
broker " + key);
+ }
+ });
+ //make sure the detail is not dirty
+ existedTopicConfigMap.forEach((key, value) -> {
+ if (!key.equals(value.getMappingDetail().getBname())) {
+ throw new RuntimeException(String.format("The broker name
is not equal %s != %s ", key, value.getMappingDetail().getBname()));
+ }
+ if (value.getMappingDetail().isDirty()) {
+ throw new RuntimeException("The mapping info is dirty in
broker " + value.getMappingDetail().getBname());
+ }
+ });
- if (maxEpochAndNum.getValue() != globalIdMap.size()) {
- throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
+ List<TopicQueueMappingDetail> detailList =
existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
+ //check the epoch and qnum
+ maxEpochAndNum =
TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
+ final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
+ detailList.forEach( mappingDetail -> {
+ if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
+ throw new RuntimeException(String.format("epoch dose not
match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(),
mappingDetail.getBname()));
}
- for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
- if (!globalIdMap.containsKey(i)) {
- throw new RuntimeException(String.format("The queue
number %s is not in globalIdMap", i));
- }
+ if (tmpMaxEpochAndNum.getValue() !=
mappingDetail.getTotalQueues()) {
+ throw new RuntimeException(String.format("total queue
number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(),
mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
+ });
+
+ globalIdMap = TopicQueueMappingUtils.buildMappingItems(new
ArrayList<>(detailList), false);
+
+ if (maxEpochAndNum.getValue() != globalIdMap.size()) {
+ throw new RuntimeException(String.format("The total queue
number in config dose not match the real hosted queues %d != %d",
maxEpochAndNum.getValue(), globalIdMap.size()));
}
- if (queueNum < globalIdMap.size()) {
- throw new RuntimeException(String.format("Cannot decrease the
queue num for static topic %d < %d", queueNum, globalIdMap.size()));
- }
- //check the queue number
- if (queueNum == globalIdMap.size()) {
- throw new RuntimeException("The topic queue num is equal the
existed queue num, do nothing");
+ for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
+ if (!globalIdMap.containsKey(i)) {
+ throw new RuntimeException(String.format("The queue number
%s is not in globalIdMap", i));
+ }
}
+
//the check is ok, now do the mapping allocation
- Map<String, Integer> brokerNumMap =
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
- Map<Integer, String> idToBroker = new HashMap<>();
- globalIdMap.forEach((key, value) -> {
- String leaderbroker =
TopicQueueMappingUtils.getLeaderBroker(value);
- idToBroker.put(key, leaderbroker);
- if (!brokerNumMap.containsKey(leaderbroker)) {
- brokerNumMap.put(leaderbroker, 1);
+ int maxNum = maxEpochAndNum.getValue();
+ TopicQueueMappingUtils.MappingAllocator allocator =
TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(),
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
+ allocator.upToNum(maxNum);
+ Map<String, Integer> expectedBrokerNumMap =
allocator.getBrokerNumMap();
+ Queue<Integer> waitAssignQueues = new ArrayDeque<Integer>();
+ Map<Integer, String> expectedIdToBroker = new HashMap<>();
+ //the following logic will make sure that, for one broker, only
"take in" or "take out" queues
+ //It can't, take in some queues but alse take out some queues.
+ globalIdMap.forEach((queueId, mappingOne) -> {
+ String leaderBroker = mappingOne.getBname();
+ if (expectedBrokerNumMap.containsKey(leaderBroker)) {
+ if (expectedBrokerNumMap.get(leaderBroker) > 0) {
+ expectedIdToBroker.put(queueId, leaderBroker);
+ expectedBrokerNumMap.put(leaderBroker,
expectedBrokerNumMap.get(leaderBroker) - 1);
+ } else {
+ waitAssignQueues.add(queueId);
+ expectedBrokerNumMap.remove(leaderBroker);
+ }
} else {
- brokerNumMap.put(leaderbroker,
brokerNumMap.get(leaderbroker) + 1);
+ waitAssignQueues.add(queueId);
}
});
- TopicQueueMappingUtils.MappingAllocator allocator =
TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
- allocator.upToNum(queueNum);
- Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
+ expectedBrokerNumMap.forEach((broker, queueNum) -> {
+ for (int i = 0; i < queueNum; i++) {
+ expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+ }
+ });
+
+ Set<Broker>
+
+ //Now construct the remapping info
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
- for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+ for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet())
{
Integer queueId = e.getKey();
- String value = e.getValue();
+ String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(value)) {
+ if (!existedTopicConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, queueNum, value, epoch);
+ TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, 0, broker, epoch);
configMapping = new
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+ existedTopicConfigMap.put(broker, configMapping);
} else {
- configMapping = existedTopicConfigMap.get(value);
+ configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(queueNum);
+ configMapping.getMappingDetail().setTotalQueues(0);
}
- LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0,
-1, -1, -1);
+ LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 29a7261..a1ff0b0 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.TopicQueueMappingOne;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -64,7 +65,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Option opt = null;
- opt = new Option("c", "clusterName", true, "create topic to which
cluster");
+ opt = new Option("c", "clusters", true, "create topic to clusters,
comma separated");
optionGroup.addOption(opt);
optionGroup.setRequired(true);
@@ -90,7 +91,9 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new
HashMap<>();
- Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new
HashMap<>();
+ Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+
try {
if (!commandLine.hasOption('t')
|| !commandLine.hasOption('c')
@@ -108,7 +111,6 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
} else {
clientMetadata.refreshClusterInfo(clusterInfo);
}
- Set<String> brokers = new HashSet<>();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
@@ -118,6 +120,12 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " +
clusters);
}
+ for (String broker : brokers) {
+ String addr = clientMetadata.findMasterBrokerAddr(broker);
+ if (addr == null) {
+ throw new RuntimeException("Can't find addr for broker " +
broker);
+ }
+ }
//get the existed topic config and mapping
TopicRouteData routeData =
defaultMQAdminExt.examineTopicRouteInfo(topic);
@@ -188,7 +196,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
Map<String, Integer> brokerNumMap =
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
- String leaderbroker =
TopicQueueMappingUtils.getLeaderBroker(value);
+ String leaderbroker = value.getBname();
idToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
@@ -204,27 +212,29 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
- String value = e.getValue();
+ String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(value)) {
- TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, queueNum, value, epoch);
- configMapping = new
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
+ if (!existedTopicConfigMap.containsKey(broker)) {
+ configMapping = new TopicConfigAndQueueMapping(new
TopicConfig(topic), new TopicQueueMappingDetail(topic, 0, broker, -1));
+ configMapping.setWriteQueueNums(1);
+ configMapping.setReadQueueNums(1);
+ existedTopicConfigMap.put(broker, configMapping);
} else {
- configMapping = existedTopicConfigMap.get(value);
+ configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(queueNum);
}
- LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0,
-1, -1, -1);
+ LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
}
-
+ existedTopicConfigMap.values().forEach( configMapping -> {
+ configMapping.getMappingDetail().setEpoch(epoch);
+ configMapping.getMappingDetail().setTotalQueues(queueNum);
+ });
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();