This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b978ff1e392e1eb8f34cf6df6ca46a4fc5fde5e7 Author: 傅冲 <[email protected]> AuthorDate: Fri May 25 13:37:10 2018 +0800 #issues-311 refactor new topic register logic (cherry picked from commit 8ce8a54) --- .../apache/rocketmq/broker/BrokerController.java | 69 +++++++++++++++------- .../broker/processor/AdminBrokerProcessor.java | 10 ++-- .../org/apache/rocketmq/common/BrokerConfig.java | 2 - 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 293e51e..ed85a67 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.Configuration; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -768,6 +770,24 @@ public class BrokerController { } } + public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { + TopicConfig registerTopicConfig = topicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + registerTopicConfig = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + } + + ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); + topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setDataVersion(dataVersion); + topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); + + doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); + } + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); @@ -788,30 +808,35 @@ public class BrokerController { this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { - List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills(), - this.brokerConfig.isCompressedRegister()); - - if (registerBrokerResultList.size() > 0) { - RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); - if (registerBrokerResult != null) { - if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { - this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); - } + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + } + } + + private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, + TopicConfigSerializeWrapper topicConfigWrapper) { + List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); + + if (registerBrokerResultList.size() > 0) { + RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } - this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); - if (checkOrderConfig) { - this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); - } + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c0a4b20..1a704a8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return false; } - private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = @@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) { - this.brokerController.registerBrokerAll(false, true, true); - } + this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); return null; } - private RemotingCommand deleteTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = @@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { + private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 7caf830..203431a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -141,8 +141,6 @@ public class BrokerConfig { * This configurable item defines interval of topics registration of broker to name server. Allowing values are * between 10, 000 and 60, 000 milliseconds. * - * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical - * registration defaults to 10, 000 in milliseconds. */ private int registerNameServerPeriod = 1000 * 30;
