Repository: storm Updated Branches: refs/heads/master 6ff3fb252 -> a558a1f07
Test Pacemaker Fix Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9c2ac7df Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9c2ac7df Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9c2ac7df Branch: refs/heads/master Commit: 9c2ac7df68753835454a8abb5c6efcabb4c9370c Parents: a536937 Author: Kyle Nusbaum <[email protected]> Authored: Mon May 2 14:29:06 2016 -0500 Committer: Kyle Nusbaum <[email protected]> Committed: Wed May 4 10:47:14 2016 -0500 ---------------------------------------------------------------------- .../apache/storm/pacemaker/PacemakerServer.java | 9 +++++++-- .../storm/pacemaker/codec/ThriftDecoder.java | 21 ++++++++------------ 2 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9c2ac7df/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java index 46ba364..fa73d96 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java @@ -138,8 +138,13 @@ class PacemakerServer implements ISaslServer { LOG.debug("received message. Passing to handler. {} : {} : {}", handler.toString(), m.toString(), channel.toString()); HBMessage response = handler.handleMessage(m, authenticated); - LOG.debug("Got Response from handler: {}", response.toString()); - channel.write(response); + if(response != null) { + LOG.debug("Got Response from handler: {}", response); + channel.write(response); + } + else { + LOG.info("Got null response from handler handling message: {}", m); + } } public void closeChannel(Channel c) { http://git-wip-us.apache.org/repos/asf/storm/blob/9c2ac7df/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java index 25edefe..7f21cf8 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java @@ -29,18 +29,20 @@ import org.apache.storm.messaging.netty.SaslMessageToken; public class ThriftDecoder extends FrameDecoder { + private static final int INTEGER_SIZE = 4; + @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { long available = buf.readableBytes(); - if(available < 2) { + if(available < INTEGER_SIZE) { return null; } buf.markReaderIndex(); int thriftLen = buf.readInt(); - available -= 4; + available -= INTEGER_SIZE; if(available < thriftLen) { // We haven't received the entire object yet, return and wait for more bytes. @@ -48,18 +50,11 @@ public class ThriftDecoder extends FrameDecoder { return null; } - buf.discardReadBytes(); + + byte serialized[] = new byte[thriftLen]; + buf.readBytes(serialized, 0, thriftLen); + HBMessage m = (HBMessage)Utils.thriftDeserialize(HBMessage.class, serialized); - HBMessage m; - if(buf.hasArray()) { - m = Utils.thriftDeserialize(HBMessage.class, buf.array(), 0, thriftLen); - buf.readerIndex(buf.readerIndex() + thriftLen); - } - else { - byte serialized[] = new byte[thriftLen]; - buf.readBytes(serialized, 0, thriftLen); - m = Utils.thriftDeserialize(HBMessage.class, serialized); - } if(m.get_type() == HBServerMessageType.CONTROL_MESSAGE) { ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
