This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 130a56eda03a0fc7efb4408818c0799422bd6308 Author: 傅冲 <[email protected]> AuthorDate: Wed Jun 13 11:13:00 2018 +0800 #325 register nameserver add crc32 check --- .../java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java | 3 +++ .../protocol/header/namesrv/RegisterBrokerRequestHeader.java | 10 ++++++++++ .../rocketmq/namesrv/processor/DefaultRequestProcessor.java | 11 +++++++++++ 3 files changed, 24 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index c6421c7..4dee01c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -138,6 +139,8 @@ public class BrokerOuterAPI { requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); + final int bodyCrc32 = UtilAll.crc32(body); + requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index 7ed7a40..19175b0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { private boolean compressed; + private Integer bodyCrc32 = 0; + public void checkFields() throws RemotingCommandException { } @@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader { public void setCompressed(boolean compressed) { this.compressed = compressed; } + + public Integer getBodyCrc32() { + return bodyCrc32; + } + + public void setBodyCrc32(Integer bodyCrc32) { + this.bodyCrc32 = bodyCrc32; + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 236e6a1..4a222ab 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MQVersion.Version; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; @@ -261,6 +262,16 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + if (requestHeader.getBodyCrc32() != 0) { + final int crc32 = UtilAll.crc32(request.getBody()); + if (crc32 != requestHeader.getBodyCrc32()) { + log.warn("receive registerBroker request,crc32 not match,from %s", + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("crc32 not match"); + } + } + TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
