This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit b978ff1e392e1eb8f34cf6df6ca46a4fc5fde5e7
Author: 傅冲 <[email protected]>
AuthorDate: Fri May 25 13:37:10 2018 +0800

    #issues-311 refactor new topic register logic
    
    (cherry picked from commit 8ce8a54)
---
 .../apache/rocketmq/broker/BrokerController.java   | 69 +++++++++++++++-------
 .../broker/processor/AdminBrokerProcessor.java     | 10 ++--
 .../org/apache/rocketmq/common/BrokerConfig.java   |  2 -
 3 files changed, 51 insertions(+), 30 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 293e51e..ed85a67 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@ import 
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -768,6 +770,24 @@ public class BrokerController {
         }
     }
 
+    public synchronized void registerIncrementBrokerData(TopicConfig 
topicConfig, DataVersion dataVersion) {
+        TopicConfig registerTopicConfig = topicConfig;
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+            || 
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            registerTopicConfig =
+                new TopicConfig(topicConfig.getTopicName(), 
topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                    this.brokerConfig.getBrokerPermission());
+        }
+
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<String, TopicConfig>();
+        topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(dataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+    }
+
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
@@ -788,30 +808,35 @@ public class BrokerController {
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
             this.brokerConfig.getRegisterBrokerTimeoutMills())) {
-            List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills(),
-                this.brokerConfig.isCompressedRegister());
-
-            if (registerBrokerResultList.size() > 0) {
-                RegisterBrokerResult registerBrokerResult = 
registerBrokerResultList.get(0);
-                if (registerBrokerResult != null) {
-                    if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
-                        
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
-                    }
+            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+        }
+    }
+
+    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+        TopicConfigSerializeWrapper topicConfigWrapper) {
+        List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isCompressedRegister());
+
+        if (registerBrokerResultList.size() > 0) {
+            RegisterBrokerResult registerBrokerResult = 
registerBrokerResultList.get(0);
+            if (registerBrokerResult != null) {
+                if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
+                    
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                }
 
-                    
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+                
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
 
-                    if (checkOrderConfig) {
-                        
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
-                    }
+                if (checkOrderConfig) {
+                    
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                 }
             }
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c0a4b20..1a704a8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
-    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         final CreateTopicRequestHeader requestHeader =
@@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() 
== 0) {
-            this.brokerController.registerBrokerAll(false, true, true);
-        }
+        
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
 
-    private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         DeleteTopicRequestHeader requestHeader =
@@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+    private synchronized RemotingCommand 
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
         log.info("updateBrokerConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 7caf830..203431a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,8 +141,6 @@ public class BrokerConfig {
      * This configurable item defines interval of topics registration of 
broker to name server. Allowing values are
      * between 10, 000 and 60, 000 milliseconds.
      *
-     * If set to 0, newly created topics will be immediately reported to name 
servers and interval of periodical
-     * registration defaults to 10, 000 in milliseconds.
      */
     private int registerNameServerPeriod = 1000 * 30;
 

Reply via email to