vongosling closed pull request #140: [ROCKETMQ-253]Compress RegisterBrokerBody
URL: https://github.com/apache/rocketmq/pull/140
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6c2a987d2..2879b1fc5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -148,12 +149,15 @@ private RegisterBrokerResult registerBroker(
         requestHeader.setBrokerName(brokerName);
         requestHeader.setClusterName(clusterName);
         requestHeader.setHaServerAddr(haServerAddr);
+        requestHeader.setCompressed(true);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, 
requestHeader);
-
+        if (request.getVersion() <= 0) {
+            request.setVersion(MQVersion.CURRENT_VERSION);
+        }
         RegisterBrokerBody requestBody = new RegisterBrokerBody();
         requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
         requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode());
+        request.setBody(requestBody.encode(true));
 
         if (oneway) {
             try {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927c2..b39dad170 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,11 +17,28 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RegisterBrokerBody extends RemotingSerializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RegisterBrokerBody.class);
+
     private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
     private List<String> filterServerList = new ArrayList<String>();
 
@@ -40,4 +57,122 @@ public void 
setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConf
     public void setFilterServerList(List<String> filterServerList) {
         this.filterServerList = filterServerList;
     }
+
+    public byte[] encode(boolean compress) {
+
+        if (!compress) {
+            return encode();
+        }
+        long start = System.currentTimeMillis();
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        DeflaterOutputStream outputStream = new 
DeflaterOutputStream(byteArrayOutputStream, new 
Deflater(Deflater.BEST_COMPRESSION));
+        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+        ConcurrentMap<String, TopicConfig> topicConfigTable = 
topicConfigSerializeWrapper.getTopicConfigTable();
+        assert topicConfigTable != null;
+        try {
+            byte[] buffer = dataVersion.encode();
+
+            // write data version
+            outputStream.write(convertIntToByteArray(buffer.length));
+            outputStream.write(buffer);
+
+            int topicNumber = topicConfigTable.size();
+
+            // write number of topic configs
+            outputStream.write(convertIntToByteArray(topicNumber));
+
+            // write topic config entry one by one.
+            for (ConcurrentMap.Entry<String, TopicConfig> next : 
topicConfigTable.entrySet()) {
+                buffer = 
next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+                outputStream.write(convertIntToByteArray(buffer.length));
+                outputStream.write(buffer);
+            }
+
+            buffer = 
JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+            // write filter server list json length
+            outputStream.write(convertIntToByteArray(buffer.length));
+
+            // write filter server list json
+            outputStream.write(buffer);
+
+            outputStream.finish();
+            long interval = System.currentTimeMillis() - start;
+            if (interval > 50) {
+                LOGGER.info("Compressing takes {}ms", interval);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+        }
+
+        return null;
+    }
+
+    public static RegisterBrokerBody decode(byte[] data, boolean compressed) 
throws IOException {
+        if (!compressed) {
+            return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+        }
+        long start = System.currentTimeMillis();
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(new 
ByteArrayInputStream(data));
+        int dataVersionLength = readInt(inflaterInputStream);
+        byte[] dataVersionBytes = readBytes(inflaterInputStream, 
dataVersionLength);
+        DataVersion dataVersion = DataVersion.decode(dataVersionBytes, 
DataVersion.class);
+
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = 
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+
+        int topicConfigNumber = readInt(inflaterInputStream);
+        LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+        for (int i = 0; i < topicConfigNumber; i++) {
+            int topicConfigJsonLength = readInt(inflaterInputStream);
+
+            byte[] buffer = readBytes(inflaterInputStream, 
topicConfigJsonLength);
+            TopicConfig topicConfig = new TopicConfig();
+            String topicConfigJson = new String(buffer, 
MixAll.DEFAULT_CHARSET);
+            topicConfig.decode(topicConfigJson);
+            topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+
+        int filterServerListJsonLength = readInt(inflaterInputStream);
+
+        byte[] filterServerListBuffer = readBytes(inflaterInputStream, 
filterServerListJsonLength);
+        String filterServerListJson = new String(filterServerListBuffer, 
MixAll.DEFAULT_CHARSET);
+        List<String> filterServerList = JSON.parseArray(filterServerListJson, 
String.class);
+        registerBrokerBody.setFilterServerList(filterServerList);
+        long interval = System.currentTimeMillis() - start;
+        if (interval > 50) {
+            LOGGER.info("Decompressing takes {}ms", interval);
+        }
+        return registerBrokerBody;
+    }
+
+    private static byte[] convertIntToByteArray(int n) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+        byteBuffer.putInt(n);
+        return byteBuffer.array();
+    }
+
+    private static byte[] readBytes(InflaterInputStream inflaterInputStream, 
int length) throws IOException {
+        byte[] buffer = new byte[length];
+        int bytesRead = 0;
+        while (bytesRead < length) {
+            int len = inflaterInputStream.read(buffer, bytesRead, length - 
bytesRead);
+            if (len == -1) {
+                throw new IOException("End of compressed data has reached");
+            } else {
+                bytesRead += len;
+            }
+        }
+        return buffer;
+    }
+
+    private static int readInt(InflaterInputStream inflaterInputStream) throws 
IOException {
+        byte[] buffer = readBytes(inflaterInputStream, 4);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        return byteBuffer.getInt();
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e9e..de4043212 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,6 +36,8 @@
     @CFNotNull
     private Long brokerId;
 
+    private boolean compressed;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -79,4 +81,12 @@ public Long getBrokerId() {
     public void setBrokerId(Long brokerId) {
         this.brokerId = brokerId;
     }
+
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
 }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
 
b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
new file mode 100644
index 000000000..594819e2b
--- /dev/null
+++ 
b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+
+    @Test
+    public void encode() throws Exception {
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        List<String> filterServerList = new ArrayList<String>();
+        for (int i = 0; i < 100; i++) {
+            filterServerList.add("localhost:10911");
+        }
+        registerBrokerBody.setFilterServerList(filterServerList);
+
+        TopicConfigSerializeWrapper wrapper = new 
TopicConfigSerializeWrapper();
+
+        ConcurrentMap<String, TopicConfig> topicConfigs = 
wrapper.getTopicConfigTable();
+
+        NumberFormat numberFormat = new DecimalFormat("0000000000");
+        List<String> topics = new ArrayList<String>();
+        for (int i = 0; i < 1024; i++) {
+            TopicConfig topicConfig = new TopicConfig("Topic" + 
numberFormat.format(i));
+            topicConfigs.put(topicConfig.getTopicName(), topicConfig);
+            topics.add(topicConfig.getTopicName());
+        }
+        registerBrokerBody.setTopicConfigSerializeWrapper(wrapper);
+
+        byte[] compressed = registerBrokerBody.encode(true);
+
+        RegisterBrokerBody registerBrokerBodyBackUp = 
RegisterBrokerBody.decode(compressed, true);
+        ConcurrentMap<String, TopicConfig> backupMap = 
registerBrokerBodyBackUp.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+        List<String> backupTopics = new ArrayList<String>();
+        for (ConcurrentMap.Entry<String, TopicConfig> next : 
backupMap.entrySet()) {
+            backupTopics.add(next.getKey());
+        }
+        Collections.sort(topics);
+        Collections.sort(backupTopics);
+        Assert.assertEquals(topics, backupTopics);
+    }
+}
\ No newline at end of file
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 964768422..316fbe046 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.namesrv.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
@@ -185,11 +186,16 @@ public RemotingCommand 
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) 
request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
-        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        RegisterBrokerBody registerBrokerBody;
 
         if (request.getBody() != null) {
-            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), 
RegisterBrokerBody.class);
+            try {
+                registerBrokerBody = 
RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+            } catch (IOException e) {
+                throw new RemotingCommandException("Failed to decode 
RegisterBrokerBody", e);
+            }
         } else {
+            registerBrokerBody = new RegisterBrokerBody();
             
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new
 AtomicLong(0));
             
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
         }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 7479fcc5f..62a82eef9 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -152,7 +152,7 @@ public RegisterBrokerResult registerBroker(
                         channel,
                         haServerAddr));
                 if (null == prevBrokerLiveInfo) {
-                    log.info("new broker registerd, {} HAServer: {}", 
brokerAddr, haServerAddr);
+                    log.info("new broker registered, {} HAServer: {}", 
brokerAddr, haServerAddr);
                 }
 
                 if (filterServerList != null) {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index f80ff14c1..13fdb1f07 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -24,10 +24,7 @@
 
     public static byte[] encode(final Object obj) {
         final String json = toJson(obj, false);
-        if (json != null) {
-            return json.getBytes(CHARSET_UTF8);
-        }
-        return null;
+        return json.getBytes(CHARSET_UTF8);
     }
 
     public static String toJson(final Object obj, boolean prettyFormat) {
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java 
b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java
new file mode 100644
index 000000000..bea4dd352
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.test.namesrv;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.junit.Test;
+
+public class NamesrvStressTest {
+
+    @Test
+    public void stressCompress() {
+        NamesrvController namesrvController = 
IntegrationTestBase.createAndStartNamesrv();
+        String nsAddr = "127.0.0.1:" + 
namesrvController.getNettyServerConfig().getListenPort();
+
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        BrokerOuterAPI brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+        brokerOuterAPI.updateNameServerAddressList(nsAddr);
+        brokerOuterAPI.start();
+        TopicConfigSerializeWrapper topicConfigWrapper = new 
TopicConfigSerializeWrapper();
+        ConcurrentMap<String, TopicConfig> topicConfigs = 
topicConfigWrapper.getTopicConfigTable();
+        NumberFormat numberFormat = new DecimalFormat("0000000000");
+        List<String> topics = new ArrayList<String>();
+        for (int i = 0; i < 10240; i++) {
+            TopicConfig topicConfig = new TopicConfig("Topic" + 
numberFormat.format(i));
+            topicConfigs.put(topicConfig.getTopicName(), topicConfig);
+            topics.add(topicConfig.getTopicName());
+        }
+
+        RateLimiter rateLimiter = RateLimiter.create(256);
+        for (int i = 0; i < 32; i++) {
+            rateLimiter.acquire();
+            brokerOuterAPI.registerBrokerAll("TestCluster", "localhost:10911", 
"broker-a",
+                i % 2 , "localhost:10912", topicConfigWrapper, null, false, 
3000);
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to