Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2704#discussion_r193463579
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java ---
    @@ -12,59 +12,54 @@
     
     package org.apache.storm.pacemaker.codec;
     
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.ByteBufAllocator;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.handler.codec.MessageToMessageEncoder;
     import java.io.IOException;
    +import java.util.List;
     import org.apache.storm.generated.HBMessage;
     import org.apache.storm.generated.HBMessageData;
     import org.apache.storm.generated.HBServerMessageType;
     import org.apache.storm.messaging.netty.ControlMessage;
     import org.apache.storm.messaging.netty.INettySerializable;
     import org.apache.storm.messaging.netty.SaslMessageToken;
     import org.apache.storm.utils.Utils;
    -import org.jboss.netty.buffer.ChannelBuffer;
    -import org.jboss.netty.buffer.ChannelBuffers;
    -import org.jboss.netty.channel.Channel;
    -import org.jboss.netty.channel.ChannelHandlerContext;
    -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -public class ThriftEncoder extends OneToOneEncoder {
    +public class ThriftEncoder extends MessageToMessageEncoder<Object> {
     
         private static final Logger LOG = LoggerFactory
             .getLogger(ThriftEncoder.class);
     
    -    private HBMessage encodeNettySerializable(INettySerializable 
netty_message,
    -                                              HBServerMessageType mType) {
    +    private HBMessage encodeNettySerializable(ByteBufAllocator alloc,
    +        INettySerializable netty_message, HBServerMessageType mType) {
     
             HBMessageData message_data = new HBMessageData();
             HBMessage m = new HBMessage();
    +        ByteBuf messageBuffer = 
alloc.heapBuffer(netty_message.encodeLength());
             try {
    -            ChannelBuffer cbuffer = netty_message.buffer();
    -            if (cbuffer.hasArray()) {
    -                message_data.set_message_blob(cbuffer.array());
    -            } else {
    -                byte buff[] = new byte[netty_message.encodeLength()];
    -                cbuffer.readBytes(buff, 0, netty_message.encodeLength());
    -                message_data.set_message_blob(buff);
    -            }
    +            netty_message.write(messageBuffer);
    +            message_data.set_message_blob(messageBuffer.array());
    --- End diff --
    
    I don't think this is going to work.
    
    If alloc is a pooled allocator it guarantees that it will return a buffer 
at least as large as the requested length, not exactly the right size, and 
`.array` only returns the backing array, so there is the possibility that we 
put in an array with garbage at the end.
    
    Also because this is a pooled heapBuffer we release it before we serialize 
the HBMessage.  This could result in the buffer being reused and overwritten 
with different data.
    
    I think if we want to use this model for serializing the netty message we 
need to allocate a new buffer each time, or we need to return the fully 
serialized HBMessage before releasing the buffer.


---

Reply via email to