lizhanhui closed pull request #326: [Issues # 325] Improve broker register 
topicrouter info performance ,reduce memory usage
URL: https://github.com/apache/rocketmq/pull/326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a4968cba5..110d8ad23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -752,6 +754,24 @@ public void run() {
         }
     }
 
+    public synchronized void registerIncrementBrokerData(TopicConfig 
topicConfig, DataVersion dataVersion) {
+        TopicConfig registerTopicConfig = topicConfig;
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+            || 
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            registerTopicConfig =
+                new TopicConfig(topicConfig.getTopicName(), 
topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                    this.brokerConfig.getBrokerPermission());
+        }
+
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<String, TopicConfig>();
+        topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(dataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+    }
+
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
@@ -772,30 +792,35 @@ public synchronized void registerBrokerAll(final boolean 
checkOrderConfig, boole
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
             this.brokerConfig.getRegisterBrokerTimeoutMills())) {
-            List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills(),
-                this.brokerConfig.isCompressedRegister());
-
-            if (registerBrokerResultList.size() > 0) {
-                RegisterBrokerResult registerBrokerResult = 
registerBrokerResultList.get(0);
-                if (registerBrokerResult != null) {
-                    if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
-                        
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
-                    }
+            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+        }
+    }
+
+    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+        TopicConfigSerializeWrapper topicConfigWrapper) {
+        List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isCompressedRegister());
+
+        if (registerBrokerResultList.size() > 0) {
+            RegisterBrokerResult registerBrokerResult = 
registerBrokerResultList.get(0);
+            if (registerBrokerResult != null) {
+                if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
+                    
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                }
 
-                    
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+                
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
 
-                    if (checkOrderConfig) {
-                        
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
-                    }
+                if (checkOrderConfig) {
+                    
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                 }
             }
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2825a34cd..4dee01cbf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -29,6 +29,7 @@
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -125,14 +126,28 @@ public void updateNameServerAddressList(final String 
addrs) {
         final List<RegisterBrokerResult> registerBrokerResultList = 
Lists.newArrayList();
         List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
+
+            final RegisterBrokerRequestHeader requestHeader = new 
RegisterBrokerRequestHeader();
+            requestHeader.setBrokerAddr(brokerAddr);
+            requestHeader.setBrokerId(brokerId);
+            requestHeader.setBrokerName(brokerName);
+            requestHeader.setClusterName(clusterName);
+            requestHeader.setHaServerAddr(haServerAddr);
+            requestHeader.setCompressed(compressed);
+
+            RegisterBrokerBody requestBody = new RegisterBrokerBody();
+            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+            requestBody.setFilterServerList(filterServerList);
+            final byte[] body = requestBody.encode(compressed);
+            final int bodyCrc32 = UtilAll.crc32(body);
+            requestHeader.setBodyCrc32(bodyCrc32);
             final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
                 brokerOuterExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = 
registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                                haServerAddr, topicConfigWrapper, 
filterServerList, oneway, timeoutMills, compressed);
+                            RegisterBrokerResult result = 
registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
@@ -158,31 +173,14 @@ public void run() {
 
     private RegisterBrokerResult registerBroker(
         final String namesrvAddr,
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
         final boolean oneway,
         final int timeoutMills,
-        final boolean compressed
+        final RegisterBrokerRequestHeader requestHeader,
+        final byte[] body
     ) throws RemotingCommandException, MQBrokerException, 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
         InterruptedException {
-        RegisterBrokerRequestHeader requestHeader = new 
RegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-        requestHeader.setBrokerId(brokerId);
-        requestHeader.setBrokerName(brokerName);
-        requestHeader.setClusterName(clusterName);
-        requestHeader.setHaServerAddr(haServerAddr);
-        requestHeader.setCompressed(compressed);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, 
requestHeader);
-
-        RegisterBrokerBody requestBody = new RegisterBrokerBody();
-        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
-        requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode(requestHeader.isCompressed()));
+        request.setBody(body);
 
         if (oneway) {
             try {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c0a4b2021..1a704a8c6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public boolean rejectRequest() {
         return false;
     }
 
-    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         final CreateTopicRequestHeader requestHeader =
@@ -246,14 +246,12 @@ private RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext ctx,
 
         
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() 
== 0) {
-            this.brokerController.registerBrokerAll(false, true, true);
-        }
+        
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
 
-    private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         DeleteTopicRequestHeader requestHeader =
@@ -299,7 +297,7 @@ private RemotingCommand 
getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom
         return response;
     }
 
-    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+    private synchronized RemotingCommand 
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
         log.info("updateBrokerConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 7caf83037..203431aee 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,8 +141,6 @@
      * This configurable item defines interval of topics registration of 
broker to name server. Allowing values are
      * between 10, 000 and 60, 000 milliseconds.
      *
-     * If set to 0, newly created topics will be immediately reported to name 
servers and interval of periodical
-     * registration defaults to 10, 000 in milliseconds.
      */
     private int registerNameServerPeriod = 1000 * 30;
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 7ed7a403d..19175b04b 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -38,6 +38,8 @@
 
     private boolean compressed;
 
+    private Integer bodyCrc32 = 0;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -88,4 +90,12 @@ public boolean isCompressed() {
     public void setCompressed(boolean compressed) {
         this.compressed = compressed;
     }
+
+    public Integer getBodyCrc32() {
+        return bodyCrc32;
+    }
+
+    public void setBodyCrc32(Integer bodyCrc32) {
+        this.bodyCrc32 = bodyCrc32;
+    }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 236e6a12c..467078c44 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -24,6 +24,7 @@
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -196,6 +197,12 @@ public RemotingCommand 
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
@@ -230,6 +237,19 @@ public RemotingCommand 
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         return response;
     }
 
+    private boolean checksum(ChannelHandlerContext ctx, RemotingCommand 
request,
+        RegisterBrokerRequestHeader requestHeader) {
+        if (requestHeader.getBodyCrc32() != 0) {
+            final int crc32 = UtilAll.crc32(request.getBody());
+            if (crc32 != requestHeader.getBodyCrc32()) {
+                log.warn(String.format("receive registerBroker request,crc32 
not match,from %s",
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())));
+                return false;
+            }
+        }
+        return true;
+    }
+
     public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
@@ -261,6 +281,12 @@ public RemotingCommand 
registerBroker(ChannelHandlerContext ctx,
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         TopicConfigSerializeWrapper topicConfigWrapper;
         if (request.getBody() != null) {
             topicConfigWrapper = 
TopicConfigSerializeWrapper.decode(request.getBody(), 
TopicConfigSerializeWrapper.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to