This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new d18d787  Add the check logic for admin process update-and-create 
static topic
d18d787 is described below

commit d18d787f92eb44020a482205e27bae4b9c725957
Author: dongeforever <[email protected]>
AuthorDate: Fri Nov 19 20:33:22 2021 +0800

    Add the check logic for admin process update-and-create static topic
---
 .../broker/processor/AdminBrokerProcessor.java     | 19 +++++++---
 .../broker/topic/TopicQueueMappingManager.java     | 44 +++++++++++++++++++++-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  4 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  4 +-
 .../protocol/header/CreateTopicRequestHeader.java  | 12 ++++++
 .../common/statictopic/TopicQueueMappingUtils.java |  2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  4 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  4 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  2 +-
 .../topic/RemappingStaticTopicSubCommand.java      | 12 +++---
 .../command/topic/UpdateStaticTopicSubCommand.java |  2 +-
 11 files changed, 85 insertions(+), 24 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2f0aafb..bab484d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -323,6 +323,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         if (TopicValidator.isSystemTopic(topic, response)) {
             return response;
         }
+        boolean force = false;
+        if (requestHeader.getForce() != null && requestHeader.getForce()) {
+            force = true;
+        }
 
         TopicConfig topicConfig = new TopicConfig(topic);
         topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
@@ -331,13 +335,18 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         topicConfig.setPerm(requestHeader.getPerm());
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 
0 : requestHeader.getTopicSysFlag());
 
-        
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-
-        
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail());
+        try {
+            
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        this.brokerController.registerIncrementBrokerData(topicConfig, 
this.brokerController.getTopicConfigManager().getDataVersion());
+            
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(),
 force);
 
-        response.setCode(ResponseCode.SUCCESS);
+            this.brokerController.registerIncrementBrokerData(topicConfig, 
this.brokerController.getTopicConfigManager().getDataVersion());
+            response.setCode(ResponseCode.SUCCESS);
+        } catch (Exception e) {
+            log.error("Update static failed for [{}]", request, e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+        }
         return response;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 13f0857..e2b46fe 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import 
org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
@@ -56,8 +57,47 @@ public class TopicQueueMappingManager extends ConfigManager {
         this.brokerController = brokerController;
     }
 
-    public void updateTopicQueueMapping(TopicQueueMappingDetail 
topicQueueMappingDetail) {
-        topicQueueMappingTable.put(topicQueueMappingDetail.getTopic(), 
topicQueueMappingDetail);
+    public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, 
boolean force) {
+        lock.lock();
+        try {
+            if (newDetail == null) {
+                return;
+            }
+            newDetail.getHostedQueues().forEach((queueId, items) -> {
+                TopicQueueMappingUtils.checkLogicQueueMappingItemOffset(items);
+            });
+
+            TopicQueueMappingDetail oldDetail = 
topicQueueMappingTable.get(newDetail.getTopic());
+            if (oldDetail == null) {
+                topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                return;
+            }
+            if (force) {
+                oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+                    newDetail.getHostedQueues().putIfAbsent(queueId, items);
+                });
+                topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+                return;
+            }
+            //do more check
+            if (newDetail.getEpoch() <= oldDetail.getEpoch()) {
+                throw new RuntimeException(String.format("Can't accept data 
with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch()));
+            }
+            for (Integer globalId : oldDetail.getHostedQueues().keySet()) {
+                ImmutableList<LogicQueueMappingItem> oldItems = 
oldDetail.getHostedQueues().get(globalId);
+                ImmutableList<LogicQueueMappingItem> newItems = 
newDetail.getHostedQueues().get(globalId);
+                if (newItems == null) {
+                    //keep the old
+                    newDetail.getHostedQueues().put(globalId, oldItems);
+                } else {
+                    
TopicQueueMappingUtils.makeSureLogicQueueMappingItemImmutable(oldItems, 
newItems);
+                }
+            }
+            topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
+        } finally {
+            lock.unlock();
+        }
+
     }
 
     public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 4df7b9e..7d539cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -80,11 +80,11 @@ public class MQAdminImpl {
         this.timeoutMillis = timeoutMillis;
     }
 
-    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
mappingDetail) throws MQClientException {
+    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
mappingDetail, boolean force) throws MQClientException {
         MQClientException exception = null;
         for (int i = 0; i < 3; i++) {
             try {
-                
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, 
topicConfig, mappingDetail, timeoutMillis);
+                
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, 
topicConfig, mappingDetail,  force, timeoutMillis);
                 break;
             } catch (Exception e) {
                 if (2 == i) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d93f690..050fc30 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2726,7 +2726,7 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
topicQueueMappingDetail,
+    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
topicQueueMappingDetail, boolean force,
                             final long timeoutMillis)
             throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         CreateTopicRequestHeader requestHeader = new 
CreateTopicRequestHeader();
@@ -2738,7 +2738,7 @@ public class MQClientAPIImpl {
         
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
         requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
         requestHeader.setOrder(topicConfig.isOrder());
-
+        requestHeader.setForce(force);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC,
 requestHeader);
         request.setBody(topicQueueMappingDetail.encode());
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 8894d0b..290ec4c 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -23,6 +23,7 @@ package org.apache.rocketmq.common.protocol.header;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class CreateTopicRequestHeader implements CommandCustomHeader {
@@ -42,6 +43,9 @@ public class CreateTopicRequestHeader implements 
CommandCustomHeader {
     @CFNotNull
     private Boolean order = false;
 
+    @CFNullable
+    private Boolean force = false;
+
     @Override
     public void checkFields() throws RemotingCommandException {
         try {
@@ -118,4 +122,12 @@ public class CreateTopicRequestHeader implements 
CommandCustomHeader {
     public void setOrder(Boolean order) {
         this.order = order;
     }
+
+    public Boolean getForce() {
+        return force;
+    }
+
+    public void setForce(Boolean force) {
+        this.force = force;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 370f0ec..eb82cad 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -162,7 +162,7 @@ public class TopicQueueMappingUtils {
         return new AbstractMap.SimpleEntry<Long, Integer>(maxEpoch, maxNum);
     }
 
-    public static void 
checkLogicQueueMappingItemOffset(ImmutableList<LogicQueueMappingItem> oldItems, 
ImmutableList<LogicQueueMappingItem> newItems) {
+    public static void 
makeSureLogicQueueMappingItemImmutable(ImmutableList<LogicQueueMappingItem> 
oldItems, ImmutableList<LogicQueueMappingItem> newItems) {
         if (oldItems == null || oldItems.isEmpty()) {
             return;
         }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6b0f5a6..7030056 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -661,8 +661,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public void createStaticTopic(String addr, String defaultTopic, 
TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws 
RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, 
topicConfig, mappingDetail);
+    public void createStaticTopic(String addr, String defaultTopic, 
TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) 
throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, 
topicConfig, mappingDetail, force);
     }
 
     @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 811ee26..06b456f 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1096,8 +1096,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     }
 
     @Override
-    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
mappingDetail) throws MQClientException {
-        this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, 
defaultTopic, topicConfig, mappingDetail);
+    public void createStaticTopic(final String addr, final String 
defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail 
mappingDetail, final boolean force) throws MQClientException {
+        this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, 
defaultTopic, topicConfig, mappingDetail, force);
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 5339709..888dad8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -337,7 +337,7 @@ public interface MQAdminExt extends MQAdmin {
         LogicalQueueRouteData toQueueRouteData) throws InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException;
 
 
-    void createStaticTopic(final String addr, final String defaultTopic, final 
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws 
RemotingException, MQBrokerException,
+    void createStaticTopic(final String addr, final String defaultTopic, final 
TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final 
boolean force) throws RemotingException, MQBrokerException,
             InterruptedException, MQClientException;
 
     void migrateTopicLogicalQueueNotify(String brokerAddr, 
LogicalQueueRouteData fromQueueRouteData,
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index 3b1b27a..c46c28a 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -111,7 +111,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
                 throw new RuntimeException("The Cluster info is empty");
             }
             clientMetadata.refreshClusterInfo(clusterInfo);
-            doRemapping(topic, wrapper.getBrokerToMapIn(), 
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt);
+            doRemapping(topic, wrapper.getBrokerToMapIn(), 
wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), clientMetadata, 
defaultMQAdminExt, false);
             return;
         }catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
@@ -123,19 +123,19 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
 
 
     public void doRemapping(String topic, Set<String> brokersToMapIn, 
Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> 
brokerConfigMap,
-                            ClientMetadata clientMetadata, DefaultMQAdminExt 
defaultMQAdminExt) throws Exception {
+                            ClientMetadata clientMetadata, DefaultMQAdminExt 
defaultMQAdminExt, boolean force) throws Exception {
         // now do the remapping
         //Step1: let the new leader can be write without the logicOffset
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = 
brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), force);
         }
         //Step2: forbid the write of old leader
         for (String broker: brokersToMapOut) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = 
brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), force);
         }
         //Step3: decide the logic offset
         for (String broker: brokersToMapOut) {
@@ -171,7 +171,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
         for (String broker: brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = 
brokerConfigMap.get(broker);
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), false);
         }
     }
 
@@ -353,7 +353,7 @@ public class RemappingStaticTopicSubCommand implements 
SubCommand {
                 System.out.println("The old mapping data is written to file " 
+ newMappingDataFile);
             }
 
-            doRemapping(topic, brokersToMapIn, brokersToMapOut, 
brokerConfigMap, clientMetadata, defaultMQAdminExt);
+            doRemapping(topic, brokersToMapIn, brokersToMapOut, 
brokerConfigMap, clientMetadata, defaultMQAdminExt, false);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 6184290..7c6abdc 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -127,7 +127,7 @@ public class UpdateStaticTopicSubCommand implements 
SubCommand {
             String broker = entry.getKey();
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = entry.getValue();
-            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail());
+            defaultMQAdminExt.createStaticTopic(addr, 
defaultMQAdminExt.getCreateTopicKey(), configMapping, 
configMapping.getMappingDetail(), false);
         }
     }
 

Reply via email to