This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 4018435  Polish the static topic command
4018435 is described below

commit 401843579e7d70ef900a02808bebb5c29277b2cd
Author: dongeforever <[email protected]>
AuthorDate: Sat Nov 20 12:51:20 2021 +0800

    Polish the static topic command
---
 .../broker/processor/AdminBrokerProcessor.java         |  8 ++++----
 .../broker/processor/PullMessageProcessor.java         |  6 +++---
 .../broker/processor/SendMessageProcessor.java         |  2 +-
 .../common/statictopic/LogicQueueMappingItem.java      |  9 ++++++++-
 .../common/statictopic/TopicQueueMappingUtils.java     | 10 ++++++++++
 .../command/topic/RemappingStaticTopicSubCommand.java  | 18 ++++++++++++++----
 .../command/topic/UpdateStaticTopicSubCommand.java     | 18 +++++++++++++-----
 7 files changed, 53 insertions(+), 18 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 9048584..3eceb49 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -653,7 +653,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                     assert i ==  mappingItems.size() - 1;
                     offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
 item.getQueueId(), timestamp);
                     if (offset > 0) {
-                        offset = item.computeStaticQueueOffset(offset);
+                        offset = item.computeStaticQueueOffsetUpToEnd(offset);
                     }
                 } else {
                     requestHeader.setPhysical(true);
@@ -670,7 +670,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                             || (item.checkIfEndOffsetDecided() && 
offsetResponseHeader.getOffset() >= item.getEndOffset())) {
                         continue;
                     } else {
-                        offset = 
item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+                        offset = 
item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
                     }
 
                 }
@@ -722,7 +722,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         }
         long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
 mappingItem.getQueueId());
 
-        offset = mappingItem.computeStaticQueueOffset(offset);
+        offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
 
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
         final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
@@ -774,7 +774,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                 throw rpcResponse.getException();
             }
             GetMinOffsetResponseHeader offsetResponseHeader = 
(GetMinOffsetResponseHeader) rpcResponse.getHeader();
-            long offset = 
mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
+            long offset = 
mappingItem.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset());
 
             final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
             final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index df4e4e2..24d3f07 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -180,16 +180,16 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
                         && nextBeginOffset >= mappingItem.getEndOffset()) {
                     nextBeginOffset = mappingItem.getEndOffset();
                 }
-                
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffset(nextBeginOffset));
+                
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
             }
             //handle min offset
-            
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffset(Math.max(mappingItem.getStartOffset(),
 responseHeader.getMinOffset())));
+            
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(),
 responseHeader.getMinOffset())));
             //handle max offset
             {
                 if (mappingItem.checkIfEndOffsetDecided()) {
                     
responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), 
mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId())));
                 } else {
-                    
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffset(responseHeader.getMaxOffset()));
+                    
responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset()));
                 }
             }
             //set the offsetDelta
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2fe0f6f..d254b8a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -135,7 +135,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
             }
             //no need to care the broker name
-            long staticLogicOffset = 
mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
+            long staticLogicOffset = 
mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset());
             if (staticLogicOffset < 0) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d convert offset error in current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
             }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 01686fd..c855dfd 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -22,7 +22,7 @@ public class LogicQueueMappingItem {
         this.timeOfEnd = timeOfEnd;
     }
 
-    public long computeStaticQueueOffset(long physicalQueueOffset) {
+    public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
         if (physicalQueueOffset < startOffset) {
             return logicOffset;
         }
@@ -33,6 +33,13 @@ public class LogicQueueMappingItem {
         return  logicOffset + (physicalQueueOffset - startOffset);
     }
 
+    public long computeStaticQueueOffset(long physicalQueueOffset) {
+        if (physicalQueueOffset < startOffset) {
+            return logicOffset;
+        }
+        return  logicOffset + (physicalQueueOffset - startOffset);
+    }
+
     public long computePhysicalQueueOffset(long staticQueueOffset) {
         return  (staticQueueOffset - logicOffset) + startOffset;
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index eb82cad..7ac7ce8 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -296,6 +296,16 @@ public class TopicQueueMappingUtils {
         }
     }
 
+    public static long blockSeqRoundUp(long offset, long blockSeqSize) {
+        long num = offset/blockSeqSize;
+        long left = offset % blockSeqSize;
+        if (left < blockSeqSize/2) {
+            return (num + 1) * blockSeqSize;
+        } else {
+            return (num + 2) * blockSeqSize;
+        }
+    }
+
 
 
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index c46c28a..aa6d134 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -81,9 +81,14 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("f", "mapFile", true, "The map file name");
+        opt = new Option("mf", "mapFile", true, "The mapping data file name ");
         opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("fr", "forceReplace", true, "Force replace the old 
mapping");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -111,7 +116,12 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-            doRemapping(topic, wrapper.getBrokerToMapIn(), 
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt, false);
+
+            boolean force = false;
+            if (commandLine.hasOption("fr") && 
Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
+                force = true;
+            }
+            doRemapping(topic, wrapper.getBrokerToMapIn(), 
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt, force);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
@@ -161,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
                 if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
                     throw new RuntimeException("The max offset is smaller then 
the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
                 }
-                
newLeader.setLogicOffset(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()
 + 10000));
+                
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()),
 10000));
                 TopicConfigAndQueueMapping mapInConfig = 
brokerConfigMap.get(newLeader.getBname());
                 //fresh the new leader
                 mapInConfig.getMappingDetail().putMappingInfo(globalId, items);
@@ -171,7 +181,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = 
brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), false);
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), force);
         }
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 7c6abdc..cdb4fc0 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -82,7 +82,11 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("f", "mapFile", true, "The map file name");
+        opt = new Option("mf", "mapFile", true, "The mapping data file name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("fr", "forceReplace", true, "Force replace the old 
mapping");
         opt.setRequired(false);
         options.addOption(opt);
 
@@ -104,6 +108,10 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
             TopicRemappingDetailWrapper wrapper = 
TopicRemappingDetailWrapper.decode(mapData.getBytes(), 
TopicRemappingDetailWrapper.class);
             //double check the config
             
TopicQueueMappingUtils.checkConsistenceOfTopicConfigAndQueueMapping(topic, 
wrapper.getBrokerConfigMap());
+            boolean force = false;
+            if (commandLine.hasOption("fr") && 
Boolean.parseBoolean(commandLine.getOptionValue("fr").trim())) {
+                force = true;
+            }
             TopicQueueMappingUtils.checkAndBuildMappingItems(new 
ArrayList<>(TopicQueueMappingUtils.getMappingDetailFromConfig(wrapper.getBrokerConfigMap().values())),
 false, true);
 
             ClusterInfo clusterInfo = 
defaultMQAdminExt.examineBrokerClusterInfo();
@@ -112,7 +120,7 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-            doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt);
+            doUpdate(wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt, force);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
@@ -121,13 +129,13 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
         }
     }
 
-    public void doUpdate(Map<String, TopicConfigAndQueueMapping> 
brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt 
defaultMQAdminExt) throws Exception {
+    public void doUpdate(Map<String, TopicConfigAndQueueMapping> 
brokerConfigMap, ClientMetadata clientMetadata, DefaultMQAdminExt 
defaultMQAdminExt, boolean force) throws Exception {
         //If some succeed, and others fail, it will cause inconsistent data
         for (Map.Entry<String, TopicConfigAndQueueMapping> entry : 
brokerConfigMap.entrySet()) {
             String broker = entry.getKey();
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = entry.getValue();
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), false);
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), force);
         }
     }
 
@@ -288,7 +296,7 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
                 System.out.println("The new mapping data is written to file " 
+ newMappingDataFile);
             }
 
-            doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt);
+            doUpdate(brokerConfigMap, clientMetadata, defaultMQAdminExt, 
false);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);

Reply via email to