This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 4018435 Polish the static topic command
4018435 is described below
commit 401843579e7d70ef900a02808bebb5c29277b2cd
Author: dongeforever <[email protected]>
AuthorDate: Sat Nov 20 12:51:20 2021 +0800
Polish the static topic command
---
.../broker/processor/AdminBrokerProcessor.java | 8 ++++----
.../broker/processor/PullMessageProcessor.java | 6 +++---
.../broker/processor/SendMessageProcessor.java | 2 +-
.../common/statictopic/LogicQueueMappingItem.java | 9 ++++++++-
.../common/statictopic/TopicQueueMappingUtils.java | 10 ++++++++++
.../command/topic/RemappingStaticTopicSubCommand.java | 18 ++++++++++++++----
.../command/topic/UpdateStaticTopicSubCommand.java | 18 +++++++++++++-----
7 files changed, 53 insertions(+), 18 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 9048584..3eceb49 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
assert i == mappingItems.size() - 1;
offset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
item.getQueueId(), timestamp);
if (offset > 0) {
- offset = item.computeStaticQueueOffset(offset);
+ offset = item.computeStaticQueueOffsetUpToEnd(offset);
}
} else {
requestHeader.setPhysical(true);
@@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
|| (item.checkIfEndOffsetDecided() &&
offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue;
} else {
- offset =
item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+ offset =
item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
}
}
@@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
}
long offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
mappingItem.getQueueId());
- offset = mappingItem.computeStaticQueueOffset(offset);
+ offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader) response.readCustomHeader();
@@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
throw rpcResponse.getException();
}
GetMinOffsetResponseHeader offsetResponseHeader =
(GetMinOffsetResponseHeader) rpcResponse.getHeader();
- long offset =
mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+ long offset =
mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) response.readCustomHeader();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index df4e4e2..24d3f07 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -180,16 +180,16 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
&& nextBeginOffset >= mappingItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset();
}
-
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
+
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
}
//handle min offset
-
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(),
responseHeader.getMinOffset())));
+
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(),
responseHeader.getMinOffset())));
//handle max offset
{
if (mappingItem.checkIfEndOffsetDecided()) {
responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(),
mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
} else {
-
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
+
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
}
}
//set the offsetDelta
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2fe0f6f..d254b8a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -135,7 +135,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
}
//no need to care the broker name
- long staticLogicOffset =
mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
+ long staticLogicOffset =
mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d convert offset error in current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 01686fd..c855dfd 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -22,7 +22,7 @@ public class LogicQueueMappingItem {
this.timeOfEnd = timeOfEnd;
}
- public long computeStaticQueueOffset(long physicalQueueOffset) {
+ public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
@@ -33,6 +33,13 @@ public class LogicQueueMappingItem {
return logicOffset + (physicalQueueOffset - startOffset);
}
+ public long computeStaticQueueOffset(long physicalQueueOffset) {
+ if (physicalQueueOffset < startOffset) {
+ return logicOffset;
+ }
+ return logicOffset + (physicalQueueOffset - startOffset);
+ }
+
public long computePhysicalQueueOffset(long staticQueueOffset) {
return (staticQueueOffset - logicOffset) + startOffset;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index eb82cad..7ac7ce8 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -296,6 +296,16 @@ public class TopicQueueMappingUtils {
}
}
+ public static long blockSeqRoundUp(long offset, long blockSeqSize) {
+ long num = offset/blockSeqSize;
+ long left = offset % blockSeqSize;
+ if (left < blockSeqSize/2) {
+ return (num + 1) * blockSeqSize;
+ } else {
+ return (num + 2) * blockSeqSize;
+ }
+ }
+
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index c46c28a..aa6d134 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("f", "mapFile", true, "The map file name");
+ opt = new Option("mf", "mapFile", true, "The mapping data file name ");
opt.setRequired(false);
options.addOption(opt);
+
+ opt = new Option("fr", "forceReplace", true, "Force replace the old
mapping");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
- doRemapping(topic, wrapper.getBrokerToMapIn(),
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt, false);
+
+ boolean force = false;
+ if (commandLine.hasOption("fr") &&
Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
+ force = true;
+ }
+ doRemapping(topic, wrapper.getBrokerToMapIn(),
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
@@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then
the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
-
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()
+ 10000));
+
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()),
10000));
TopicConfigAndQueueMapping mapInConfig =
brokerConfigMap.get(newLeader.getBname());
//fresh the new leader
mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
@@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements
SubCommand {
for (String broker: brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping =
brokerConfigMap.get(broker);
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), false);
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), force);
}
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 7c6abdc..cdb4fc0 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("f", "mapFile", true, "The map file name");
+ opt = new Option("mf", "mapFile", true, "The mapping data file name");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("fr", "forceReplace", true, "Force replace the old
mapping");
opt.setRequired(false);
options.addOption(opt);
@@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
TopicRemappingDetailWrapper wrapper =
TopicRemappingDetailWrapper.decode(mapData.getBytes(),
TopicRemappingDetailWrapper.class);
//double check the config
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic,
wrapper.getBrokerConfigMap());
+ boolean force = false;
+ if (commandLine.hasOption("fr") &&
Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
+ force = true;
+ }
TopicQueueMappingUtils.checkAndBuildMappingItems(new
ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())),
false, true);
ClusterInfo clusterInfo =
defaultMQAdminExt.examineBrokerClusterInfo();
@@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
- doUpdate(wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt);
+ doUpdate(wrapper.getBrokerConfigMap(), clientMetadata,
defaultMQAdminExt, force);
return;
}catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
@@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
}
}
- public void doUpdate(Map<String, TopicConfigAndQueueMapping>
brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt
defaultMQAdminExt) throws Exception {
+ public void doUpdate(Map<String, TopicConfigAndQueueMapping>
brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt
defaultMQAdminExt, boolean force) throws Exception {
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry :
brokerConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
- defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), false);
+ defaultMQAdminExt.createStaticTopic(addr,
defaultMQAdminExt.getCreateTopicKey(), configMapping,
configMapping.getMappingDetail(), force);
}
}
@@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements
SubCommand {
System.out.println("The new mapping data is written to file "
+ newMappingDataFile);
}
- doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
+ doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt,
false);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);