Repository: storm
Updated Branches:
  refs/heads/master 8814bfaa1 -> 5d93a8644


Adding in maximum/minimum object size


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6465b5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6465b5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6465b5d

Branch: refs/heads/master
Commit: e6465b5dc5c7e31e0b92a18de578a13d05a5abf2
Parents: 559053d
Author: Kyle Nusbaum <[email protected]>
Authored: Mon Jul 17 14:40:15 2017 -0500
Committer: Kyle Nusbaum <[email protected]>
Committed: Mon Jul 17 14:40:15 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                                   |  1 +
 storm-client/src/jvm/org/apache/storm/Config.java    |  7 +++++++
 .../org/apache/storm/pacemaker/PacemakerClient.java  |  3 ++-
 .../apache/storm/pacemaker/codec/ThriftDecoder.java  | 15 +++++++++++++++
 .../pacemaker/codec/ThriftNettyClientCodec.java      |  6 ++++--
 .../org/apache/storm/pacemaker/PacemakerServer.java  |  4 ++--
 .../pacemaker/codec/ThriftNettyServerCodec.java      |  5 ++---
 7 files changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index eb9cb24..0078323 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -282,6 +282,7 @@ pacemaker.thread.timeout: 10
 pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"
 pacemaker.kerberos.users: []
+pacemaker.thrift.message.size.max: 10485760
 
 #default storm daemon metrics reporter plugins
 storm.daemon.metrics.reporter.plugins:

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index 5d5f723..367e7ed 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -804,6 +804,13 @@ public class Config extends HashMap<String, Object> {
     public static final String PACEMAKER_AUTH_METHOD = "pacemaker.auth.method";
 
     /**
+     * Pacemaker Thrift Max Message Size (bytes)
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String PACEMAKER_THRIFT_MESSAGE_SIZE_MAX = 
"pacemaker.thrift.message.size.max";
+
+    /**
      * Max no.of seconds group mapping service will cache user groups
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java 
b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index be09b3e..bdfdeae 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -117,7 +117,8 @@ public class PacemakerClient implements ISaslClient {
         bootstrap.setOption("keepAlive", true);
 
         remote_addr = new InetSocketAddress(host, port);
-        ChannelPipelineFactory pipelineFactory = new 
ThriftNettyClientCodec(this, config, authMethod, host).pipelineFactory();
+        int thriftMessageMaxSize = 
(Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+        ChannelPipelineFactory pipelineFactory = new 
ThriftNettyClientCodec(this, config, authMethod, host, 
thriftMessageMaxSize).pipelineFactory();
         bootstrap.setPipelineFactory(pipelineFactory);
         bootstrap.connect(remote_addr);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java 
b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
index 968ca34..58d2b1b 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
@@ -27,10 +27,18 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.apache.storm.messaging.netty.ControlMessage;
 import org.apache.storm.messaging.netty.SaslMessageToken;
 
+import java.io.IOException;
+
 public class ThriftDecoder extends FrameDecoder {
 
     private static final int INTEGER_SIZE = 4;
 
+    private int maxLength;
+
+    public ThriftDecoder(int maxLength) {
+        this.maxLength = maxLength;
+    }
+
     @Override
     protected Object decode(ChannelHandlerContext ctx, Channel channel, 
ChannelBuffer buf) throws Exception {
 
@@ -42,6 +50,13 @@ public class ThriftDecoder extends FrameDecoder {
         buf.markReaderIndex();
 
         int thriftLen = buf.readInt();
+
+        if(thriftLen < 0 || thriftLen > maxLength) {
+            throw new IOException("Thrift message of length " + 
Integer.toString(thriftLen)
+                                  + " is greater than allowed " + maxLength
+                                  + " or less than 0.");
+        }
+
         available -= INTEGER_SIZE;
 
         if(available < thriftLen) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
 
b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
index 42a68fd..1ad5a6c 100644
--- 
a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
+++ 
b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
@@ -48,12 +48,14 @@ public class ThriftNettyClientCodec {
     private AuthMethod authMethod;
     private Map<String, Object> topoConf;
     private String host;
+    private int thriftMessageMaxSize;
 
-    public ThriftNettyClientCodec(PacemakerClient pacemaker_client, 
Map<String, Object> topoConf, AuthMethod authMethod, String host) {
+    public ThriftNettyClientCodec(PacemakerClient pacemaker_client, 
Map<String, Object> topoConf, AuthMethod authMethod, String host, int 
thriftMessageMaxSize) {
         client = pacemaker_client;
         this.authMethod = authMethod;
         this.topoConf = topoConf;
         this.host = host;
+        this.thriftMessageMaxSize = thriftMessageMaxSize;
     }
 
     public ChannelPipelineFactory pipelineFactory() {
@@ -61,7 +63,7 @@ public class ThriftNettyClientCodec {
             public ChannelPipeline getPipeline() {
                 ChannelPipeline pipeline = Channels.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
-                pipeline.addLast("decoder", new ThriftDecoder());
+                pipeline.addLast("decoder", new 
ThriftDecoder(thriftMessageMaxSize));
 
                 if (authMethod == AuthMethod.KERBEROS) {
                     try {

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java 
b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index 315d0ac..5a68ac8 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -106,8 +106,8 @@ class PacemakerServer implements ISaslServer {
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES);
         bootstrap.setOption("keepAlive", true);
-
-        ChannelPipelineFactory pipelineFactory = new 
ThriftNettyServerCodec(this, config, authMethod).pipelineFactory();
+        int thriftMessageMaxSize = 
(Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+        ChannelPipelineFactory pipelineFactory = new 
ThriftNettyServerCodec(this, config, authMethod, 
thriftMessageMaxSize).pipelineFactory();
         bootstrap.setPipelineFactory(pipelineFactory);
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);

http://git-wip-us.apache.org/repos/asf/storm/blob/e6465b5d/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
 
b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
index 8b0de60..0dc6338 100644
--- 
a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
+++ 
b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.pacemaker.codec;
 
-import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.messaging.netty.ISaslServer;
 import org.apache.storm.messaging.netty.IServer;
@@ -52,7 +51,7 @@ public class ThriftNettyServerCodec {
     private static final Logger LOG = LoggerFactory
         .getLogger(ThriftNettyServerCodec.class);
 
-    public ThriftNettyServerCodec(IServer server, Map<String, Object> 
topoConf, AuthMethod authMethod) {
+    public ThriftNettyServerCodec(IServer server, Map<String, Object> 
topoConf, AuthMethod authMethod, int thriftMessageMaxSize) {
         this.server = server;
         this.authMethod = authMethod;
         this.topoConf = topoConf;
@@ -64,7 +63,7 @@ public class ThriftNettyServerCodec {
 
                 ChannelPipeline pipeline = Channels.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
-                pipeline.addLast("decoder", new ThriftDecoder());
+                pipeline.addLast("decoder", new 
ThriftDecoder(thriftMessageMaxSize));
                 if(authMethod == AuthMethod.DIGEST) {
                     try {
                         LOG.debug("Adding SaslStormServerHandler to pacemaker 
server pipeline.");

Reply via email to