This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 45946d4 Ignore the existed queueId in static topi creation
45946d4 is described below
commit 45946d475697f0b78e4d3b31105f75bb76e7ee19
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 18 10:15:44 2021 +0800
Ignore the existed queueId in static topi creation
---
.../command/topic/UpdateStaticTopicSubCommand.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 5cee8c8..29a7261 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
+import com.sun.xml.internal.ws.api.BindingIDFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000,
System.currentTimeMillis());
- newIdToBroker.forEach( (queueId, broker) -> {
+ for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
+ Integer queueId = e.getKey();
+ String value = e.getValue();
+ if (globalIdMap.containsKey(queueId)) {
+ //ignore the exited
+ continue;
+ }
TopicConfigAndQueueMapping configMapping;
- if (!existedTopicConfigMap.containsKey(broker)) {
+ if (!existedTopicConfigMap.containsKey(value)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
- TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, queueNum, broker, epoch);
+ TopicQueueMappingDetail mappingDetail = new
TopicQueueMappingDetail(topic, queueNum, value, epoch);
configMapping = new
TopicConfigAndQueueMapping(topicConfig, mappingDetail);
} else {
- configMapping = existedTopicConfigMap.get(broker);
+ configMapping = existedTopicConfigMap.get(value);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
- LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0,
-1, -1, -1);
+ LogicQueueMappingItem mappingItem = new
LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0,
-1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId,
ImmutableList.of(mappingItem));
- });
+ }
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
existedTopicConfigMap.entrySet()) {