vongosling closed pull request #140: [ROCKETMQ-253]Compress RegisterBrokerBody
URL: https://github.com/apache/rocketmq/pull/140
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6c2a987d2..2879b1fc5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -148,12 +149,15 @@ private RegisterBrokerResult registerBroker(
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
+ requestHeader.setCompressed(true);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER,
requestHeader);
-
+ if (request.getVersion() <= 0) {
+ request.setVersion(MQVersion.CURRENT_VERSION);
+ }
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
- request.setBody(requestBody.encode());
+ request.setBody(requestBody.encode(true));
if (oneway) {
try {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927c2..b39dad170 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,11 +17,28 @@
package org.apache.rocketmq.common.protocol.body;
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RegisterBrokerBody extends RemotingSerializable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegisterBrokerBody.class);
+
private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();
@@ -40,4 +57,122 @@ public void
setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConf
public void setFilterServerList(List<String> filterServerList) {
this.filterServerList = filterServerList;
}
+
+ public byte[] encode(boolean compress) {
+
+ if (!compress) {
+ return encode();
+ }
+ long start = System.currentTimeMillis();
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ DeflaterOutputStream outputStream = new
DeflaterOutputStream(byteArrayOutputStream, new
Deflater(Deflater.BEST_COMPRESSION));
+ DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
topicConfigSerializeWrapper.getTopicConfigTable();
+ assert topicConfigTable != null;
+ try {
+ byte[] buffer = dataVersion.encode();
+
+ // write data version
+ outputStream.write(convertIntToByteArray(buffer.length));
+ outputStream.write(buffer);
+
+ int topicNumber = topicConfigTable.size();
+
+ // write number of topic configs
+ outputStream.write(convertIntToByteArray(topicNumber));
+
+ // write topic config entry one by one.
+ for (ConcurrentMap.Entry<String, TopicConfig> next :
topicConfigTable.entrySet()) {
+ buffer =
next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+ outputStream.write(convertIntToByteArray(buffer.length));
+ outputStream.write(buffer);
+ }
+
+ buffer =
JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+ // write filter server list json length
+ outputStream.write(convertIntToByteArray(buffer.length));
+
+ // write filter server list json
+ outputStream.write(buffer);
+
+ outputStream.finish();
+ long interval = System.currentTimeMillis() - start;
+ if (interval > 50) {
+ LOGGER.info("Compressing takes {}ms", interval);
+ }
+ return byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+ }
+
+ return null;
+ }
+
+ public static RegisterBrokerBody decode(byte[] data, boolean compressed)
throws IOException {
+ if (!compressed) {
+ return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+ }
+ long start = System.currentTimeMillis();
+ InflaterInputStream inflaterInputStream = new InflaterInputStream(new
ByteArrayInputStream(data));
+ int dataVersionLength = readInt(inflaterInputStream);
+ byte[] dataVersionBytes = readBytes(inflaterInputStream,
dataVersionLength);
+ DataVersion dataVersion = DataVersion.decode(dataVersionBytes,
DataVersion.class);
+
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+
+ int topicConfigNumber = readInt(inflaterInputStream);
+ LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+ for (int i = 0; i < topicConfigNumber; i++) {
+ int topicConfigJsonLength = readInt(inflaterInputStream);
+
+ byte[] buffer = readBytes(inflaterInputStream,
topicConfigJsonLength);
+ TopicConfig topicConfig = new TopicConfig();
+ String topicConfigJson = new String(buffer,
MixAll.DEFAULT_CHARSET);
+ topicConfig.decode(topicConfigJson);
+ topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+
+ int filterServerListJsonLength = readInt(inflaterInputStream);
+
+ byte[] filterServerListBuffer = readBytes(inflaterInputStream,
filterServerListJsonLength);
+ String filterServerListJson = new String(filterServerListBuffer,
MixAll.DEFAULT_CHARSET);
+ List<String> filterServerList = JSON.parseArray(filterServerListJson,
String.class);
+ registerBrokerBody.setFilterServerList(filterServerList);
+ long interval = System.currentTimeMillis() - start;
+ if (interval > 50) {
+ LOGGER.info("Decompressing takes {}ms", interval);
+ }
+ return registerBrokerBody;
+ }
+
+ private static byte[] convertIntToByteArray(int n) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+ byteBuffer.putInt(n);
+ return byteBuffer.array();
+ }
+
+ private static byte[] readBytes(InflaterInputStream inflaterInputStream,
int length) throws IOException {
+ byte[] buffer = new byte[length];
+ int bytesRead = 0;
+ while (bytesRead < length) {
+ int len = inflaterInputStream.read(buffer, bytesRead, length -
bytesRead);
+ if (len == -1) {
+ throw new IOException("End of compressed data has reached");
+ } else {
+ bytesRead += len;
+ }
+ }
+ return buffer;
+ }
+
+ private static int readInt(InflaterInputStream inflaterInputStream) throws
IOException {
+ byte[] buffer = readBytes(inflaterInputStream, 4);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+ return byteBuffer.getInt();
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e9e..de4043212 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,6 +36,8 @@
@CFNotNull
private Long brokerId;
+ private boolean compressed;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -79,4 +81,12 @@ public Long getBrokerId() {
public void setBrokerId(Long brokerId) {
this.brokerId = brokerId;
}
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
new file mode 100644
index 000000000..594819e2b
--- /dev/null
+++
b/common/src/test/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBodyTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+
+ @Test
+ public void encode() throws Exception {
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+ List<String> filterServerList = new ArrayList<String>();
+ for (int i = 0; i < 100; i++) {
+ filterServerList.add("localhost:10911");
+ }
+ registerBrokerBody.setFilterServerList(filterServerList);
+
+ TopicConfigSerializeWrapper wrapper = new
TopicConfigSerializeWrapper();
+
+ ConcurrentMap<String, TopicConfig> topicConfigs =
wrapper.getTopicConfigTable();
+
+ NumberFormat numberFormat = new DecimalFormat("0000000000");
+ List<String> topics = new ArrayList<String>();
+ for (int i = 0; i < 1024; i++) {
+ TopicConfig topicConfig = new TopicConfig("Topic" +
numberFormat.format(i));
+ topicConfigs.put(topicConfig.getTopicName(), topicConfig);
+ topics.add(topicConfig.getTopicName());
+ }
+ registerBrokerBody.setTopicConfigSerializeWrapper(wrapper);
+
+ byte[] compressed = registerBrokerBody.encode(true);
+
+ RegisterBrokerBody registerBrokerBodyBackUp =
RegisterBrokerBody.decode(compressed, true);
+ ConcurrentMap<String, TopicConfig> backupMap =
registerBrokerBodyBackUp.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+ List<String> backupTopics = new ArrayList<String>();
+ for (ConcurrentMap.Entry<String, TopicConfig> next :
backupMap.entrySet()) {
+ backupTopics.add(next.getKey());
+ }
+ Collections.sort(topics);
+ Collections.sort(backupTopics);
+ Assert.assertEquals(topics, backupTopics);
+ }
+}
\ No newline at end of file
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 964768422..316fbe046 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
@@ -185,11 +186,16 @@ public RemotingCommand
registerBrokerWithFilterServer(ChannelHandlerContext ctx,
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader)
request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
- RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+ RegisterBrokerBody registerBrokerBody;
if (request.getBody() != null) {
- registerBrokerBody = RegisterBrokerBody.decode(request.getBody(),
RegisterBrokerBody.class);
+ try {
+ registerBrokerBody =
RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+ } catch (IOException e) {
+ throw new RemotingCommandException("Failed to decode
RegisterBrokerBody", e);
+ }
} else {
+ registerBrokerBody = new RegisterBrokerBody();
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new
AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 7479fcc5f..62a82eef9 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -152,7 +152,7 @@ public RegisterBrokerResult registerBroker(
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
- log.info("new broker registerd, {} HAServer: {}",
brokerAddr, haServerAddr);
+ log.info("new broker registered, {} HAServer: {}",
brokerAddr, haServerAddr);
}
if (filterServerList != null) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index f80ff14c1..13fdb1f07 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -24,10 +24,7 @@
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
- if (json != null) {
- return json.getBytes(CHARSET_UTF8);
- }
- return null;
+ return json.getBytes(CHARSET_UTF8);
}
public static String toJson(final Object obj, boolean prettyFormat) {
diff --git
a/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java
b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java
new file mode 100644
index 000000000..bea4dd352
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/namesrv/NamesrvStressTest.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.test.namesrv;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.junit.Test;
+
+public class NamesrvStressTest {
+
+ @Test
+ public void stressCompress() {
+ NamesrvController namesrvController =
IntegrationTestBase.createAndStartNamesrv();
+ String nsAddr = "127.0.0.1:" +
namesrvController.getNettyServerConfig().getListenPort();
+
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ BrokerOuterAPI brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+ brokerOuterAPI.updateNameServerAddressList(nsAddr);
+ brokerOuterAPI.start();
+ TopicConfigSerializeWrapper topicConfigWrapper = new
TopicConfigSerializeWrapper();
+ ConcurrentMap<String, TopicConfig> topicConfigs =
topicConfigWrapper.getTopicConfigTable();
+ NumberFormat numberFormat = new DecimalFormat("0000000000");
+ List<String> topics = new ArrayList<String>();
+ for (int i = 0; i < 10240; i++) {
+ TopicConfig topicConfig = new TopicConfig("Topic" +
numberFormat.format(i));
+ topicConfigs.put(topicConfig.getTopicName(), topicConfig);
+ topics.add(topicConfig.getTopicName());
+ }
+
+ RateLimiter rateLimiter = RateLimiter.create(256);
+ for (int i = 0; i < 32; i++) {
+ rateLimiter.acquire();
+ brokerOuterAPI.registerBrokerAll("TestCluster", "localhost:10911",
"broker-a",
+ i % 2 , "localhost:10912", topicConfigWrapper, null, false,
3000);
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services