This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new e9cafe9 Polish the remapping logic
e9cafe9 is described below
commit e9cafe9fef748b92ea4d020e16bb95192ffaadff
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 18 14:36:47 2021 +0800
Polish the remapping logic
---
.../topic/RemappingStaticTopicSubCommand.java | 63 +++++++++++++---------
.../command/topic/UpdateStaticTopicSubCommand.java | 2 +-
2 files changed, 38 insertions(+), 27 deletions(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 4cc5acf..34aec17 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -194,6 +194,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
+ long maxEpoch = maxEpochAndNum.getKey();
TopicQueueMappingUtils.MappingAllocator allocator =
TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(),
brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap =
allocator.getBrokerNumMap();
@@ -217,40 +218,50 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
- expectedIdToBroker.put(waitAssignQueues.poll(), broker);
+ Integer queueId = waitAssignQueues.poll();
+ assert queueId != null;
+ expectedIdToBroker.put(queueId, broker);
}
});
- Set<Broker>
-
//Now construct the remapping info
-
- //construct the topic configAndMapping
- long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
- for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet())
{
- Integer queueId = e.getKey();
- String broker = e.getValue();
- if (globalIdMap.containsKey(queueId)) {
- //ignore the exited
+ Set<String> brokersToMapOut = new HashSet<>();
+ Set<String> brokersToMapIn = new HashSet<>();
+ for (Map.Entry<Integer, String> mapEntry :
expectedIdToBroker.entrySet()) {
+ Integer queueId = mapEntry.getKey();
+ String broker = mapEntry.getValue();
+ TopicQueueMappingOne topicQueueMappingOne =
globalIdMap.get(queueId);
+ assert topicQueueMappingOne != null;
+ if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
- TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(broker)) {
- TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, 0, broker, epoch);
- configMapping = new
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
- existedTopicConfigMap.put(broker, configMapping);
- } else {
- configMapping = existedTopicConfigMap.get(broker);
-
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
- configMapping.getMappingDetail().setEpoch(epoch);
- configMapping.getMappingDetail().setTotalQueues(0);
- }
- LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
- configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
+ //remapping
+ String mapInBroker = broker;
+ String mapOutBroker = topicQueueMappingOne.getBname();
+ brokersToMapIn.add(mapInBroker);
+ brokersToMapOut.add(mapOutBroker);
+ TopicConfigAndQueueMapping mapInConfig =
existedTopicConfigMap.get(mapInBroker);
+ TopicConfigAndQueueMapping mapOutConfig =
existedTopicConfigMap.get(mapOutBroker);
+
+ mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums()
+ 1);
+ mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums()
+ 1);
+
+ List<LogicQueueMappingItem> items = new
ArrayList<>(topicQueueMappingOne.getItems());
+ LogicQueueMappingItem last = items.get(items.size() - 1);
+ items.add(new LogicQueueMappingItem(last.getGen() + 1,
mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
+
+ ImmutableList<LogicQueueMappingItem> resultItems =
ImmutableList.copyOf(items);
+ mapInConfig.getMappingDetail().putMappingInfo(queueId,
resultItems);
+ mapOutConfig.getMappingDetail().putMappingInfo(queueId,
resultItems);
}
+ long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
+ existedTopicConfigMap.values().forEach( configMapping -> {
+ configMapping.getMappingDetail().setEpoch(epoch);
+ configMapping.getMappingDetail().setTotalQueues(maxNum);
+ });
+ //decide the new offset
+
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index a1ff0b0..278ea9f 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -226,7 +226,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
-
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
+
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));