Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193463579
--- Diff:
storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java ---
@@ -12,59 +12,54 @@
package org.apache.storm.pacemaker.codec;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
import java.io.IOException;
+import java.util.List;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.messaging.netty.ControlMessage;
import org.apache.storm.messaging.netty.INettySerializable;
import org.apache.storm.messaging.netty.SaslMessageToken;
import org.apache.storm.utils.Utils;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ThriftEncoder extends OneToOneEncoder {
+public class ThriftEncoder extends MessageToMessageEncoder<Object> {
private static final Logger LOG = LoggerFactory
.getLogger(ThriftEncoder.class);
- private HBMessage encodeNettySerializable(INettySerializable
netty_message,
- HBServerMessageType mType) {
+ private HBMessage encodeNettySerializable(ByteBufAllocator alloc,
+ INettySerializable netty_message, HBServerMessageType mType) {
HBMessageData message_data = new HBMessageData();
HBMessage m = new HBMessage();
+ ByteBuf messageBuffer =
alloc.heapBuffer(netty_message.encodeLength());
try {
- ChannelBuffer cbuffer = netty_message.buffer();
- if (cbuffer.hasArray()) {
- message_data.set_message_blob(cbuffer.array());
- } else {
- byte buff[] = new byte[netty_message.encodeLength()];
- cbuffer.readBytes(buff, 0, netty_message.encodeLength());
- message_data.set_message_blob(buff);
- }
+ netty_message.write(messageBuffer);
+ message_data.set_message_blob(messageBuffer.array());
--- End diff --
I don't think this is going to work.
If alloc is a pooled allocator it guarantees that it will return a buffer
at least as large as the requested length, not exactly the right size, and
`.array` only returns the backing array, so there is the possibility that we
put in an array with garbage at the end.
Also because this is a pooled heapBuffer we release it before we serialize
the HBMessage. This could result in the buffer being reused and overwritten
with different data.
I think if we want to use this model for serializing the netty message we
need to allocate a new buffer each time, or we need to return the fully
serialized HBMessage before releasing the buffer.
---