Repository: storm Updated Branches: refs/heads/master 8814bfaa1 -> 5d93a8644
Adding in maximum/minimum object size Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6465b5d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6465b5d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6465b5d Branch: refs/heads/master Commit: e6465b5dc5c7e31e0b92a18de578a13d05a5abf2 Parents: 559053d Author: Kyle Nusbaum <[email protected]> Authored: Mon Jul 17 14:40:15 2017 -0500 Committer: Kyle Nusbaum <[email protected]> Committed: Mon Jul 17 14:40:15 2017 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 1 + storm-client/src/jvm/org/apache/storm/Config.java | 7 +++++++ .../org/apache/storm/pacemaker/PacemakerClient.java | 3 ++- .../apache/storm/pacemaker/codec/ThriftDecoder.java | 15 +++++++++++++++ .../pacemaker/codec/ThriftNettyClientCodec.java | 6 ++++-- .../org/apache/storm/pacemaker/PacemakerServer.java | 4 ++-- .../pacemaker/codec/ThriftNettyServerCodec.java | 5 ++--- 7 files changed, 33 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index eb9cb24..0078323 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -282,6 +282,7 @@ pacemaker.thread.timeout: 10 pacemaker.childopts: "-Xmx1024m" pacemaker.auth.method: "NONE" pacemaker.kerberos.users: [] +pacemaker.thrift.message.size.max: 10485760 #default storm daemon metrics reporter plugins storm.daemon.metrics.reporter.plugins: http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 5d5f723..367e7ed 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -804,6 +804,13 @@ public class Config extends HashMap<String, Object> { public static final String PACEMAKER_AUTH_METHOD = "pacemaker.auth.method"; /** + * Pacemaker Thrift Max Message Size (bytes) + */ + @isInteger + @isPositiveNumber + public static final String PACEMAKER_THRIFT_MESSAGE_SIZE_MAX = "pacemaker.thrift.message.size.max"; + + /** * Max no.of seconds group mapping service will cache user groups */ @isInteger http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java index be09b3e..bdfdeae 100644 --- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java +++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java @@ -117,7 +117,8 @@ public class PacemakerClient implements ISaslClient { bootstrap.setOption("keepAlive", true); remote_addr = new InetSocketAddress(host, port); - ChannelPipelineFactory pipelineFactory = new ThriftNettyClientCodec(this, config, authMethod, host).pipelineFactory(); + int thriftMessageMaxSize = (Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX); + ChannelPipelineFactory pipelineFactory = new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize).pipelineFactory(); bootstrap.setPipelineFactory(pipelineFactory); bootstrap.connect(remote_addr); } http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java index 968ca34..58d2b1b 100644 --- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java +++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java @@ -27,10 +27,18 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.apache.storm.messaging.netty.ControlMessage; import org.apache.storm.messaging.netty.SaslMessageToken; +import java.io.IOException; + public class ThriftDecoder extends FrameDecoder { private static final int INTEGER_SIZE = 4; + private int maxLength; + + public ThriftDecoder(int maxLength) { + this.maxLength = maxLength; + } + @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { @@ -42,6 +50,13 @@ public class ThriftDecoder extends FrameDecoder { buf.markReaderIndex(); int thriftLen = buf.readInt(); + + if(thriftLen < 0 || thriftLen > maxLength) { + throw new IOException("Thrift message of length " + Integer.toString(thriftLen) + + " is greater than allowed " + maxLength + + " or less than 0."); + } + available -= INTEGER_SIZE; if(available < thriftLen) { http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java index 42a68fd..1ad5a6c 100644 --- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java +++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java @@ -48,12 +48,14 @@ public class ThriftNettyClientCodec { private AuthMethod authMethod; private Map<String, Object> topoConf; private String host; + private int thriftMessageMaxSize; - public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map<String, Object> topoConf, AuthMethod authMethod, String host) { + public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map<String, Object> topoConf, AuthMethod authMethod, String host, int thriftMessageMaxSize) { client = pacemaker_client; this.authMethod = authMethod; this.topoConf = topoConf; this.host = host; + this.thriftMessageMaxSize = thriftMessageMaxSize; } public ChannelPipelineFactory pipelineFactory() { @@ -61,7 +63,7 @@ public class ThriftNettyClientCodec { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("encoder", new ThriftEncoder()); - pipeline.addLast("decoder", new ThriftDecoder()); + pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize)); if (authMethod == AuthMethod.KERBEROS) { try { http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java index 315d0ac..5a68ac8 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java @@ -106,8 +106,8 @@ class PacemakerServer implements ISaslServer { bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES); bootstrap.setOption("keepAlive", true); - - ChannelPipelineFactory pipelineFactory = new ThriftNettyServerCodec(this, config, authMethod).pipelineFactory(); + int thriftMessageMaxSize = (Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX); + ChannelPipelineFactory pipelineFactory = new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize).pipelineFactory(); bootstrap.setPipelineFactory(pipelineFactory); Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java index 8b0de60..0dc6338 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java @@ -17,7 +17,6 @@ */ package org.apache.storm.pacemaker.codec; -import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.messaging.netty.ISaslServer; import org.apache.storm.messaging.netty.IServer; @@ -52,7 +51,7 @@ public class ThriftNettyServerCodec { private static final Logger LOG = LoggerFactory .getLogger(ThriftNettyServerCodec.class); - public ThriftNettyServerCodec(IServer server, Map<String, Object> topoConf, AuthMethod authMethod) { + public ThriftNettyServerCodec(IServer server, Map<String, Object> topoConf, AuthMethod authMethod, int thriftMessageMaxSize) { this.server = server; this.authMethod = authMethod; this.topoConf = topoConf; @@ -64,7 +63,7 @@ public class ThriftNettyServerCodec { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("encoder", new ThriftEncoder()); - pipeline.addLast("decoder", new ThriftDecoder()); + pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize)); if(authMethod == AuthMethod.DIGEST) { try { LOG.debug("Adding SaslStormServerHandler to pacemaker server pipeline.");
