[ 
https://issues.apache.org/jira/browse/ROCKETMQ-319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389120#comment-16389120
 ] 

ASF GitHub Bot commented on ROCKETMQ-319:
-----------------------------------------

zhouxinyu closed pull request #205: [ROCKETMQ-319] Improve broker register 
performance and reduce memory usage  
URL: https://github.com/apache/rocketmq/pull/205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8891bd322..d08688bcf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -729,14 +729,14 @@ public void start() throws Exception {
             this.filterServerManager.start();
         }
 
-        this.registerBrokerAll(true, false);
+        this.registerBrokerAll(true, false, true);
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    BrokerController.this.registerBrokerAll(true, false);
+                    BrokerController.this.registerBrokerAll(true, false, 
brokerConfig.isForceRegister());
                 } catch (Throwable e) {
                     log.error("registerBrokerAll Exception", e);
                 }
@@ -752,7 +752,7 @@ public void run() {
         }
     }
 
-    public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
boolean oneway) {
+    public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
@@ -767,28 +767,56 @@ public synchronized void registerBrokerAll(final boolean 
checkOrderConfig, boole
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
-        RegisterBrokerResult registerBrokerResult = 
this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
+        if (forceRegister || 
needRegister(this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills());
-
-        if (registerBrokerResult != null) {
-            if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
-                
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+            List<RegisterBrokerResult> registerBrokerResultList = 
this.brokerOuterAPI.registerBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills(),
+                this.brokerConfig.isCompressedRegister());
+
+            if (registerBrokerResultList.size() > 0) {
+                RegisterBrokerResult registerBrokerResult = 
registerBrokerResultList.get(0);
+                if (registerBrokerResult != null) {
+                    if (this.updateMasterHAServerAddrPeriodically && 
registerBrokerResult.getHaServerAddr() != null) {
+                        
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                    }
+
+                    
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+                    if (checkOrderConfig) {
+                        
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+                    }
+                }
             }
+        }
+    }
 
-            
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+    private boolean needRegister(final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final int timeoutMills) {
 
-            if (checkOrderConfig) {
-                
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+        TopicConfigSerializeWrapper topicConfigWrapper = 
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, 
brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
+        boolean needRegister = false;
+        for (Boolean changed : changeList) {
+            if (changed) {
+                needRegister = true;
+                break;
             }
         }
+        return needRegister;
     }
 
     public TopicConfigManager getTopicConfigManager() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index cba70a0ee..bfb90e93e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,11 +16,19 @@
  */
 package org.apache.rocketmq.broker.out;
 
+import com.google.common.collect.Lists;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
@@ -31,6 +39,8 @@
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -52,6 +62,8 @@
     private final RemotingClient remotingClient;
     private final TopAddressing topAddressing = new 
TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
+    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+        new ArrayBlockingQueue<Runnable>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -97,7 +109,7 @@ public void updateNameServerAddressList(final String addrs) {
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
-    public RegisterBrokerResult registerBrokerAll(
+    public List<RegisterBrokerResult> registerBrokerAll(
         final String clusterName,
         final String brokerAddr,
         final String brokerName,
@@ -106,27 +118,41 @@ public RegisterBrokerResult registerBrokerAll(
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills) {
-        RegisterBrokerResult registerBrokerResult = null;
+        final int timeoutMills,
+        final boolean compressed) {
 
+        final List<RegisterBrokerResult> registerBrokerResultList = 
Lists.newArrayList();
         List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            for (String namesrvAddr : nameServerAddressList) {
-                try {
-                    RegisterBrokerResult result = 
this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                        haServerAddr, topicConfigWrapper, filterServerList, 
oneway, timeoutMills);
-                    if (result != null) {
-                        registerBrokerResult = result;
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
+            final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            RegisterBrokerResult result = 
registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+                                haServerAddr, topicConfigWrapper, 
filterServerList, oneway, timeoutMills, compressed);
+                            if (result != null) {
+                                registerBrokerResultList.add(result);
+                            }
+
+                            log.info("register broker to name server {} OK", 
namesrvAddr);
+                        } catch (Exception e) {
+                            log.warn("registerBroker Exception, {}", 
namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
                     }
+                });
+            }
 
-                    log.info("register broker to name server {} OK", 
namesrvAddr);
-                } catch (Exception e) {
-                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
-                }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
             }
         }
 
-        return registerBrokerResult;
+        return registerBrokerResultList;
     }
 
     private RegisterBrokerResult registerBroker(
@@ -139,7 +165,8 @@ private RegisterBrokerResult registerBroker(
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills
+        final int timeoutMills,
+        final boolean compressed
     ) throws RemotingCommandException, MQBrokerException, 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
         InterruptedException {
         RegisterBrokerRequestHeader requestHeader = new 
RegisterBrokerRequestHeader();
@@ -148,12 +175,13 @@ private RegisterBrokerResult registerBroker(
         requestHeader.setBrokerName(brokerName);
         requestHeader.setClusterName(clusterName);
         requestHeader.setHaServerAddr(haServerAddr);
+        requestHeader.setCompressed(compressed);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, 
requestHeader);
 
         RegisterBrokerBody requestBody = new RegisterBrokerBody();
         requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
         requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode());
+        request.setBody(requestBody.encode(requestHeader.isCompressed()));
 
         if (oneway) {
             try {
@@ -231,6 +259,71 @@ public void unregisterBroker(
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public List<Boolean> needRegister(
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final int timeoutMills) {
+        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
+        List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
+            final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            QueryDataVersionRequestHeader requestHeader = new 
QueryDataVersionRequestHeader();
+                            requestHeader.setBrokerAddr(brokerAddr);
+                            requestHeader.setBrokerId(brokerId);
+                            requestHeader.setBrokerName(brokerName);
+                            requestHeader.setClusterName(clusterName);
+                            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, 
requestHeader);
+                            
request.setBody(topicConfigWrapper.getDataVersion().encode());
+                            RemotingCommand response = 
remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+                            DataVersion nameServerDataVersion = null;
+                            Boolean changed = false;
+                            switch (response.getCode()) {
+                                case ResponseCode.SUCCESS: {
+                                    QueryDataVersionResponseHeader 
queryDataVersionResponseHeader =
+                                        (QueryDataVersionResponseHeader) 
response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                    changed = 
queryDataVersionResponseHeader.getChanged();
+                                    byte[] body = response.getBody();
+                                    if (body != null) {
+                                        nameServerDataVersion = 
DataVersion.decode(body, DataVersion.class);
+                                        if 
(!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+                                            changed = true;
+                                        }
+                                    }
+                                    if (changed == null || changed) {
+                                        changedList.add(Boolean.TRUE);
+                                    }
+                                }
+                                default:
+                                    break;
+                            }
+                            log.warn("Query data version from name server {} 
OK,changed {}, broker {},name server {}", namesrvAddr, changed, 
topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : 
nameServerDataVersion);
+                        } catch (Exception e) {
+                            changedList.add(Boolean.TRUE);
+                            log.error("Query data version from name server {}  
Exception, {}", namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
+                    }
+                });
+
+            }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("query dataversion from nameserver countDownLatch 
await Exception", e);
+            }
+        }
+        return changedList;
+    }
+
     public TopicConfigSerializeWrapper getAllTopicConfig(
         final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d69a78700..7b21ab47f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -245,7 +245,7 @@ private RemotingCommand 
updateAndCreateTopic(ChannelHandlerContext ctx,
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 
0 : requestHeader.getTopicSysFlag());
 
         
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-        this.brokerController.registerBrokerAll(false, true);
+        this.brokerController.registerBrokerAll(false, true, true);
 
         return null;
     }
@@ -310,8 +310,8 @@ private RemotingCommand 
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCo
                     log.info("updateBrokerConfig, new config: [{}] client: {} 
", properties, ctx.channel().remoteAddress());
                     
this.brokerController.getConfiguration().update(properties);
                     if (properties.containsKey("brokerPermission")) {
-                        this.brokerController.registerBrokerAll(false, false);
                         
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+                        this.brokerController.registerBrokerAll(false, false, 
true);
                     }
                 } else {
                     log.error("string2Properties error");
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index cd30a089b..46d161e92 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -210,7 +210,7 @@ public TopicConfig createTopicInSendMessageMethod(final 
String topic, final Stri
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -254,7 +254,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -279,7 +279,7 @@ public void updateTopicUnitFlag(final String topic, final 
boolean unit) {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
@@ -299,7 +299,7 @@ public void updateTopicUnitSubFlag(final String topic, 
final boolean hasUnitSub)
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
new file mode 100644
index 000000000..69e0dd382
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import org.mockito.Mock;
+import static org.mockito.Mockito.when;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerOuterAPITest {
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+    @Mock
+    private MessageStore messageStore;
+    private String clusterName = "clusterName";
+    private String brokerName = "brokerName";
+    private String brokerAddr = "brokerAddr";
+    private long brokerId = 0L;
+    private String nameserver1 = "127.0.0.1";
+    private String nameserver2 = "127.0.0.2";
+    private String nameserver3 = "127.0.0.3";
+    private int timeOut = 3000;
+
+    @Mock
+    private NettyRemotingClient nettyRemotingClient;
+
+    private BrokerOuterAPI brokerOuterAPI;
+
+    public void init() throws Exception {
+        brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(brokerOuterAPI, nettyRemotingClient);
+    }
+
+    @Test
+    public void test_needRegister_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+        final RemotingCommand response = buildResponse(Boolean.TRUE);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+
+        
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1,
 nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), 
any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, 
brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(3, booleanList.size());
+        assertEquals(false, booleanList.contains(Boolean.FALSE));
+    }
+
+    @Test
+    public void test_needRegister_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+
+        
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1,
 nameserver2, new String[] {nameserver3}));
+
+        when(nettyRemotingClient.invokeSync(anyString(), 
any(RemotingCommand.class), anyLong())).thenAnswer(new 
Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws 
Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return buildResponse(Boolean.TRUE);
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return buildResponse(Boolean.FALSE);
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return buildResponse(Boolean.TRUE);
+                }
+                return buildResponse(Boolean.TRUE);
+            }
+        });
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, 
brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(2, booleanList.size());
+        boolean success = Iterables.any(booleanList,
+            new Predicate<Boolean>() {
+                public boolean apply(Boolean input) {
+                    return input ? true : false;
+                }
+            });
+
+        assertEquals(true, success);
+
+    }
+
+    @Test
+    public void test_register_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        final RegisterBrokerResponseHeader responseHeader = 
(RegisterBrokerResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+
+        
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1,
 nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), 
any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<RegisterBrokerResult> registerBrokerResultList = 
brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, 
"hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), 
false, timeOut, true);
+
+        assertEquals(3, registerBrokerResultList.size());
+    }
+
+    @Test
+    public void test_register_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+
+        
when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1,
 nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), 
any(RemotingCommand.class), anyLong())).thenAnswer(new 
Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws 
Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return response;
+                }
+                return response;
+            }
+        });
+        List<RegisterBrokerResult> registerBrokerResultList = 
brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, 
"hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), 
false, timeOut, true);
+
+        assertEquals(2, registerBrokerResultList.size());
+    }
+
+    private RemotingCommand buildResponse(Boolean changed) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = 
(QueryDataVersionResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        responseHeader.setChanged(changed);
+        return response;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index efb36b50a..a5e9d1209 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -23,9 +25,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
 public class BrokerConfig {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
@@ -133,6 +132,10 @@
     private boolean filterSupportRetry = false;
     private boolean enablePropertyFilter = false;
 
+    private boolean compressedRegister = false;
+
+    private boolean forceRegister = false;
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -598,4 +601,20 @@ public boolean isEnablePropertyFilter() {
     public void setEnablePropertyFilter(boolean enablePropertyFilter) {
         this.enablePropertyFilter = enablePropertyFilter;
     }
+
+    public boolean isCompressedRegister() {
+        return compressedRegister;
+    }
+
+    public void setCompressedRegister(boolean compressedRegister) {
+        this.compressedRegister = compressedRegister;
+    }
+
+    public boolean isForceRegister() {
+        return forceRegister;
+    }
+
+    public void setForceRegister(boolean forceRegister) {
+        this.forceRegister = forceRegister;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java 
b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index 71b00fdd7..e54000deb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -78,4 +78,13 @@ public int hashCode() {
         }
         return result;
     }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("DataVersion[");
+        sb.append("timestamp=").append(timestamp);
+        sb.append(", counter=").append(counter);
+        sb.append(']');
+        return sb.toString();
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java 
b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
index 3860ec3cc..564d60c54 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
@@ -23,14 +23,21 @@
 public class ThreadFactoryImpl implements ThreadFactory {
     private final AtomicLong threadIndex = new AtomicLong(0);
     private final String threadNamePrefix;
+    private final boolean daemon;
 
     public ThreadFactoryImpl(final String threadNamePrefix) {
+        this(threadNamePrefix, false);
+    }
+
+    public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
         this.threadNamePrefix = threadNamePrefix;
+        this.daemon = daemon;
     }
 
     @Override
     public Thread newThread(Runnable r) {
-        return new Thread(r, threadNamePrefix + 
this.threadIndex.incrementAndGet());
-
+        Thread thread = new Thread(r, threadNamePrefix + 
this.threadIndex.incrementAndGet());
+        thread.setDaemon(daemon);
+        return thread;
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5900c0b9d..8cf2d46ad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -165,4 +165,6 @@
     public static final int SEND_BATCH_MESSAGE = 320;
 
     public static final int QUERY_CONSUME_QUEUE = 321;
+
+    public static final int QUERY_DATA_VERSION = 322;
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927c2..2b49b6d14 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,14 +17,155 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RegisterBrokerBody extends RemotingSerializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RegisterBrokerBody.class);
     private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
     private List<String> filterServerList = new ArrayList<String>();
 
+    public byte[] encode(boolean compress) {
+
+        if (!compress) {
+            return super.encode();
+        }
+        long start = System.currentTimeMillis();
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        DeflaterOutputStream outputStream = new 
DeflaterOutputStream(byteArrayOutputStream, new 
Deflater(Deflater.BEST_COMPRESSION));
+        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+        ConcurrentMap<String, TopicConfig> topicConfigTable = 
cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+        assert topicConfigTable != null;
+        try {
+            byte[] buffer = dataVersion.encode();
+
+            // write data version
+            outputStream.write(convertIntToByteArray(buffer.length));
+            outputStream.write(buffer);
+
+            int topicNumber = topicConfigTable.size();
+
+            // write number of topic configs
+            outputStream.write(convertIntToByteArray(topicNumber));
+
+            // write topic config entry one by one.
+            for (ConcurrentMap.Entry<String, TopicConfig> next : 
topicConfigTable.entrySet()) {
+                buffer = 
next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+                outputStream.write(convertIntToByteArray(buffer.length));
+                outputStream.write(buffer);
+            }
+
+            buffer = 
JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+            // write filter server list json length
+            outputStream.write(convertIntToByteArray(buffer.length));
+
+            // write filter server list json
+            outputStream.write(buffer);
+
+            outputStream.finish();
+            long interval = System.currentTimeMillis() - start;
+            if (interval > 50) {
+                LOGGER.info("Compressing takes {}ms", interval);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+        }
+
+        return null;
+    }
+
+    public static RegisterBrokerBody decode(byte[] data, boolean compressed) 
throws IOException {
+        if (!compressed) {
+            return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+        }
+        long start = System.currentTimeMillis();
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(new 
ByteArrayInputStream(data));
+        int dataVersionLength = readInt(inflaterInputStream);
+        byte[] dataVersionBytes = readBytes(inflaterInputStream, 
dataVersionLength);
+        DataVersion dataVersion = DataVersion.decode(dataVersionBytes, 
DataVersion.class);
+
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = 
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+        int topicConfigNumber = readInt(inflaterInputStream);
+        LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+        for (int i = 0; i < topicConfigNumber; i++) {
+            int topicConfigJsonLength = readInt(inflaterInputStream);
+
+            byte[] buffer = readBytes(inflaterInputStream, 
topicConfigJsonLength);
+            TopicConfig topicConfig = new TopicConfig();
+            String topicConfigJson = new String(buffer, 
MixAll.DEFAULT_CHARSET);
+            topicConfig.decode(topicConfigJson);
+            topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+
+        int filterServerListJsonLength = readInt(inflaterInputStream);
+
+        byte[] filterServerListBuffer = readBytes(inflaterInputStream, 
filterServerListJsonLength);
+        String filterServerListJson = new String(filterServerListBuffer, 
MixAll.DEFAULT_CHARSET);
+        List<String> filterServerList = new ArrayList<String>();
+        try {
+            filterServerList = JSON.parseArray(filterServerListJson, 
String.class);
+        } catch (Exception e) {
+            LOGGER.error("Decompressing occur Exception {}", 
filterServerListJson);
+        }
+
+        registerBrokerBody.setFilterServerList(filterServerList);
+        long interval = System.currentTimeMillis() - start;
+        if (interval > 50) {
+            LOGGER.info("Decompressing takes {}ms", interval);
+        }
+        return registerBrokerBody;
+    }
+
+    private static byte[] convertIntToByteArray(int n) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+        byteBuffer.putInt(n);
+        return byteBuffer.array();
+    }
+
+    private static byte[] readBytes(InflaterInputStream inflaterInputStream, 
int length) throws IOException {
+        byte[] buffer = new byte[length];
+        int bytesRead = 0;
+        while (bytesRead < length) {
+            int len = inflaterInputStream.read(buffer, bytesRead, length - 
bytesRead);
+            if (len == -1) {
+                throw new IOException("End of compressed data has reached");
+            } else {
+                bytesRead += len;
+            }
+        }
+        return buffer;
+    }
+
+    private static int readInt(InflaterInputStream inflaterInputStream) throws 
IOException {
+        byte[] buffer = readBytes(inflaterInputStream, 4);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        return byteBuffer.getInt();
+    }
+
     public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
         return topicConfigSerializeWrapper;
     }
@@ -40,4 +181,16 @@ public void 
setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConf
     public void setFilterServerList(List<String> filterServerList) {
         this.filterServerList = filterServerList;
     }
+
+    public static ConcurrentMap<String, TopicConfig> cloneTopicConfigTable(
+        ConcurrentMap<String, TopicConfig> topicConfigConcurrentMap) {
+        ConcurrentHashMap<String, TopicConfig> result = new 
ConcurrentHashMap<String, TopicConfig>();
+        if (topicConfigConcurrentMap != null) {
+            for (Map.Entry<String, TopicConfig> entry : 
topicConfigConcurrentMap.entrySet()) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
new file mode 100644
index 000000000..ac6a617db
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryDataVersionRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String brokerName;
+    @CFNotNull
+    private String brokerAddr;
+    @CFNotNull
+    private String clusterName;
+    @CFNotNull
+    private Long brokerId;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
new file mode 100644
index 000000000..90741e5f5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryDataVersionResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Boolean changed;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public Boolean getChanged() {
+        return changed;
+    }
+
+    public void setChanged(Boolean changed) {
+        this.changed = changed;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("QueryDataVersionResponseHeader{");
+        sb.append("changed=").append(changed);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e9e..7ed7a403d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,7 +36,8 @@
     @CFNotNull
     private Long brokerId;
 
-    @Override
+    private boolean compressed;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -79,4 +80,12 @@ public Long getBrokerId() {
     public void setBrokerId(Long brokerId) {
         this.brokerId = brokerId;
     }
+
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
 }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java 
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
new file mode 100644
index 000000000..87a0fc008
--- /dev/null
+++ 
b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+    @Test
+    public void test_encode_decode() throws IOException {
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        
registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
+        
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<String, TopicConfig>();
+        for (int i = 0; i < 10000; i++) {
+            topicConfigTable.put(String.valueOf(i), new 
TopicConfig(String.valueOf(i)));
+        }
+
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        byte[] compareEncode = registerBrokerBody.encode(true);
+        byte[] encode2 = registerBrokerBody.encode(false);
+        System.out.println(compareEncode.length);
+        System.out.println(encode2.length);
+        RegisterBrokerBody decodeRegisterBrokerBody = 
RegisterBrokerBody.decode(compareEncode, true);
+
+        
assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(),
 
decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size());
+
+    }
+}
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index ed5b20b16..d0a62696a 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -20,6 +20,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
@@ -39,6 +40,8 @@
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -79,6 +82,8 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx,
                 return this.getKVConfig(ctx, request);
             case RequestCode.DELETE_KV_CONFIG:
                 return this.deleteKVConfig(ctx, request);
+            case RequestCode.QUERY_DATA_VERSION:
+                return queryBrokerTopicConfig(ctx, request);
             case RequestCode.REGISTER_BROKER:
                 Version brokerVersion = 
MQVersion.value2Version(request.getVersion());
                 if (brokerVersion.ordinal() >= 
MQVersion.Version.V3_0_11.ordinal()) {
@@ -192,7 +197,11 @@ public RemotingCommand 
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
-            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), 
RegisterBrokerBody.class);
+            try {
+                registerBrokerBody = 
RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+            } catch (Exception e) {
+                throw new RemotingCommandException("Failed to decode 
RegisterBrokerBody", e);
+            }
         } else {
             
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new
 AtomicLong(0));
             
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
@@ -219,6 +228,30 @@ public RemotingCommand 
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         return response;
     }
 
+    public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = 
(QueryDataVersionResponseHeader) response.readCustomHeader();
+        final QueryDataVersionRequestHeader requestHeader =
+            (QueryDataVersionRequestHeader) 
request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
+        DataVersion dataVersion = DataVersion.decode(request.getBody(), 
DataVersion.class);
+
+        Boolean changed = 
this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(),
 dataVersion);
+        if (!changed) {
+            
this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
+        }
+
+        DataVersion nameSeverDataVersion = 
this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        if (nameSeverDataVersion != null) {
+            response.setBody(nameSeverDataVersion.encode());
+        }
+        responseHeader.setChanged(changed);
+        return response;
+    }
+
     public RemotingCommand registerBroker(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 109d3e810..36647df2e 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -183,13 +183,24 @@ public RegisterBrokerResult registerBroker(
         return result;
     }
 
-    private boolean isBrokerTopicConfigChanged(final String brokerAddr, final 
DataVersion dataVersion) {
+    public boolean isBrokerTopicConfigChanged(final String brokerAddr, final 
DataVersion dataVersion) {
+        DataVersion prev = queryBrokerTopicConfig(brokerAddr);
+        return null == prev || !prev.equals(dataVersion);
+    }
+
+    public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
         BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
-        if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
-            return true;
+        if (prev != null) {
+            return prev.getDataVersion();
         }
+        return null;
+    }
 
-        return false;
+    public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
+        BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+        if (prev != null) {
+            prev.setLastUpdateTimestamp(System.currentTimeMillis());
+        }
     }
 
     private void createAndUpdateQueueData(final String brokerName, final 
TopicConfig topicConfig) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> improve broker register performance and reduce memory usage
> -----------------------------------------------------------
>
>                 Key: ROCKETMQ-319
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-319
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-broker, rocketmq-namesrv
>    Affects Versions: 4.0.0-incubating, 4.1.0-incubating
>            Reporter: yubaofu
>            Assignee: yukon
>            Priority: Major
>
> when MQ broker cluster manage topic size very large, eg 10,000+,when MQ 
> Broker register meta info to name server,cause mq broker gc problem,so we 
> fixed it . 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to