Repository: storm
Updated Branches:
  refs/heads/1.x-branch 673326f26 -> 5975ccfd5


Test Pacemaker Fix


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3280fcf7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3280fcf7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3280fcf7

Branch: refs/heads/1.x-branch
Commit: 3280fcf763e548679973156dadaf26189ba98b03
Parents: 673326f
Author: Kyle Nusbaum <[email protected]>
Authored: Mon May 2 14:29:06 2016 -0500
Committer: Jungtaek Lim <[email protected]>
Committed: Mon May 16 18:17:57 2016 +0900

----------------------------------------------------------------------
 .../apache/storm/pacemaker/PacemakerServer.java |  9 +++++++--
 .../storm/pacemaker/codec/ThriftDecoder.java    | 21 ++++++++------------
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3280fcf7/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java 
b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java
index 46ba364..fa73d96 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java
@@ -138,8 +138,13 @@ class PacemakerServer implements ISaslServer {
         LOG.debug("received message. Passing to handler. {} : {} : {}",
                   handler.toString(), m.toString(), channel.toString());
         HBMessage response = handler.handleMessage(m, authenticated);
-        LOG.debug("Got Response from handler: {}", response.toString());
-        channel.write(response);
+        if(response != null) {
+            LOG.debug("Got Response from handler: {}", response);
+            channel.write(response);
+        }
+        else {
+            LOG.info("Got null response from handler handling message: {}", m);
+        }
     }
 
     public void closeChannel(Channel c) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3280fcf7/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java 
b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
index 25edefe..7f21cf8 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
@@ -29,18 +29,20 @@ import org.apache.storm.messaging.netty.SaslMessageToken;
 
 public class ThriftDecoder extends FrameDecoder {
 
+    private static final int INTEGER_SIZE = 4;
+
     @Override
     protected Object decode(ChannelHandlerContext ctx, Channel channel, 
ChannelBuffer buf) throws Exception {
 
         long available = buf.readableBytes();
-        if(available < 2) {
+        if(available < INTEGER_SIZE) {
             return null;
         }
 
         buf.markReaderIndex();
 
         int thriftLen = buf.readInt();
-        available -= 4;
+        available -= INTEGER_SIZE;
 
         if(available < thriftLen) {
             // We haven't received the entire object yet, return and wait for 
more bytes.
@@ -48,18 +50,11 @@ public class ThriftDecoder extends FrameDecoder {
             return null;
         }
 
-        buf.discardReadBytes();
+        
+        byte serialized[] = new byte[thriftLen];
+        buf.readBytes(serialized, 0, thriftLen);
+        HBMessage m = (HBMessage)Utils.thriftDeserialize(HBMessage.class, 
serialized);
 
-        HBMessage m;
-        if(buf.hasArray()) {
-            m = Utils.thriftDeserialize(HBMessage.class, buf.array(), 0, 
thriftLen);
-            buf.readerIndex(buf.readerIndex() + thriftLen);
-        }
-        else {
-            byte serialized[] = new byte[thriftLen];
-            buf.readBytes(serialized, 0, thriftLen);
-            m = Utils.thriftDeserialize(HBMessage.class, serialized);
-        }
 
         if(m.get_type() == HBServerMessageType.CONTROL_MESSAGE) {
             ControlMessage cm = 
ControlMessage.read(m.get_data().get_message_blob());

Reply via email to