Repository: storm Updated Branches: refs/heads/1.0.x-branch eead21327 -> 490fb9637
Test Pacemaker Fix Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bef6f70a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bef6f70a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bef6f70a Branch: refs/heads/1.0.x-branch Commit: bef6f70a8474688bdb5b6156fd8519ead4b6f785 Parents: eead213 Author: Kyle Nusbaum <[email protected]> Authored: Mon May 2 14:29:06 2016 -0500 Committer: Jungtaek Lim <[email protected]> Committed: Mon May 16 18:23:43 2016 +0900 ---------------------------------------------------------------------- .../apache/storm/pacemaker/PacemakerServer.java | 9 +++++++-- .../storm/pacemaker/codec/ThriftDecoder.java | 21 ++++++++------------ 2 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bef6f70a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java index 46ba364..fa73d96 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerServer.java @@ -138,8 +138,13 @@ class PacemakerServer implements ISaslServer { LOG.debug("received message. Passing to handler. {} : {} : {}", handler.toString(), m.toString(), channel.toString()); HBMessage response = handler.handleMessage(m, authenticated); - LOG.debug("Got Response from handler: {}", response.toString()); - channel.write(response); + if(response != null) { + LOG.debug("Got Response from handler: {}", response); + channel.write(response); + } + else { + LOG.info("Got null response from handler handling message: {}", m); + } } public void closeChannel(Channel c) { http://git-wip-us.apache.org/repos/asf/storm/blob/bef6f70a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java index 25edefe..7f21cf8 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java @@ -29,18 +29,20 @@ import org.apache.storm.messaging.netty.SaslMessageToken; public class ThriftDecoder extends FrameDecoder { + private static final int INTEGER_SIZE = 4; + @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { long available = buf.readableBytes(); - if(available < 2) { + if(available < INTEGER_SIZE) { return null; } buf.markReaderIndex(); int thriftLen = buf.readInt(); - available -= 4; + available -= INTEGER_SIZE; if(available < thriftLen) { // We haven't received the entire object yet, return and wait for more bytes. @@ -48,18 +50,11 @@ public class ThriftDecoder extends FrameDecoder { return null; } - buf.discardReadBytes(); + + byte serialized[] = new byte[thriftLen]; + buf.readBytes(serialized, 0, thriftLen); + HBMessage m = (HBMessage)Utils.thriftDeserialize(HBMessage.class, serialized); - HBMessage m; - if(buf.hasArray()) { - m = Utils.thriftDeserialize(HBMessage.class, buf.array(), 0, thriftLen); - buf.readerIndex(buf.readerIndex() + thriftLen); - } - else { - byte serialized[] = new byte[thriftLen]; - buf.readBytes(serialized, 0, thriftLen); - m = Utils.thriftDeserialize(HBMessage.class, serialized); - } if(m.get_type() == HBServerMessageType.CONTROL_MESSAGE) { ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
