lizhanhui closed pull request #326: [Issues # 325] Improve broker register topicrouter info performance ,reduce memory usage URL: https://github.com/apache/rocketmq/pull/326
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a4968cba5..110d8ad23 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.Configuration; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -752,6 +754,24 @@ public void run() { } } + public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { + TopicConfig registerTopicConfig = topicConfig; + if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) + || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { + registerTopicConfig = + new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), + this.brokerConfig.getBrokerPermission()); + } + + ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); + topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setDataVersion(dataVersion); + topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); + + doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); + } + public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); @@ -772,30 +792,35 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { - List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( - this.brokerConfig.getBrokerClusterName(), - this.getBrokerAddr(), - this.brokerConfig.getBrokerName(), - this.brokerConfig.getBrokerId(), - this.getHAServerAddr(), - topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), - oneway, - this.brokerConfig.getRegisterBrokerTimeoutMills(), - this.brokerConfig.isCompressedRegister()); - - if (registerBrokerResultList.size() > 0) { - RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); - if (registerBrokerResult != null) { - if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { - this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); - } + doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); + } + } + + private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, + TopicConfigSerializeWrapper topicConfigWrapper) { + List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( + this.brokerConfig.getBrokerClusterName(), + this.getBrokerAddr(), + this.brokerConfig.getBrokerName(), + this.brokerConfig.getBrokerId(), + this.getHAServerAddr(), + topicConfigWrapper, + this.filterServerManager.buildNewFilterServerList(), + oneway, + this.brokerConfig.getRegisterBrokerTimeoutMills(), + this.brokerConfig.isCompressedRegister()); + + if (registerBrokerResultList.size() > 0) { + RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); + if (registerBrokerResult != null) { + if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { + this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); + } - this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); + this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); - if (checkOrderConfig) { - this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); - } + if (checkOrderConfig) { + this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 2825a34cd..4dee01cbf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -125,14 +126,28 @@ public void updateNameServerAddressList(final String addrs) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { + + final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + requestHeader.setHaServerAddr(haServerAddr); + requestHeader.setCompressed(compressed); + + RegisterBrokerBody requestBody = new RegisterBrokerBody(); + requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); + requestBody.setFilterServerList(filterServerList); + final byte[] body = requestBody.encode(compressed); + final int bodyCrc32 = UtilAll.crc32(body); + requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { - RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, - haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed); + RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } @@ -158,31 +173,14 @@ public void run() { private RegisterBrokerResult registerBroker( final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, final boolean oneway, final int timeoutMills, - final boolean compressed + final RegisterBrokerRequestHeader requestHeader, + final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); - requestHeader.setBrokerAddr(brokerAddr); - requestHeader.setBrokerId(brokerId); - requestHeader.setBrokerName(brokerName); - requestHeader.setClusterName(clusterName); - requestHeader.setHaServerAddr(haServerAddr); - requestHeader.setCompressed(compressed); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); - - RegisterBrokerBody requestBody = new RegisterBrokerBody(); - requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); - requestBody.setFilterServerList(filterServerList); - request.setBody(requestBody.encode(requestHeader.isCompressed())); + request.setBody(body); if (oneway) { try { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c0a4b2021..1a704a8c6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -212,7 +212,7 @@ public boolean rejectRequest() { return false; } - private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = @@ -246,14 +246,12 @@ private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) { - this.brokerController.registerBrokerAll(false, true, true); - } + this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); return null; } - private RemotingCommand deleteTopic(ChannelHandlerContext ctx, + private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = @@ -299,7 +297,7 @@ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom return response; } - private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { + private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 7caf83037..203431aee 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -141,8 +141,6 @@ * This configurable item defines interval of topics registration of broker to name server. Allowing values are * between 10, 000 and 60, 000 milliseconds. * - * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical - * registration defaults to 10, 000 in milliseconds. */ private int registerNameServerPeriod = 1000 * 30; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index 7ed7a403d..19175b04b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -38,6 +38,8 @@ private boolean compressed; + private Integer bodyCrc32 = 0; + public void checkFields() throws RemotingCommandException { } @@ -88,4 +90,12 @@ public boolean isCompressed() { public void setCompressed(boolean compressed) { this.compressed = compressed; } + + public Integer getBodyCrc32() { + return bodyCrc32; + } + + public void setBodyCrc32(Integer bodyCrc32) { + this.bodyCrc32 = bodyCrc32; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 236e6a12c..467078c44 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; @@ -196,6 +197,12 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + if (!checksum(ctx, request, requestHeader)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("crc32 not match"); + return response; + } + RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); if (request.getBody() != null) { @@ -230,6 +237,19 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, return response; } + private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request, + RegisterBrokerRequestHeader requestHeader) { + if (requestHeader.getBodyCrc32() != 0) { + final int crc32 = UtilAll.crc32(request.getBody()); + if (crc32 != requestHeader.getBodyCrc32()) { + log.warn(String.format("receive registerBroker request,crc32 not match,from %s", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()))); + return false; + } + } + return true; + } + public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class); @@ -261,6 +281,12 @@ public RemotingCommand registerBroker(ChannelHandlerContext ctx, final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + if (!checksum(ctx, request, requestHeader)) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("crc32 not match"); + return response; + } + TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services