http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 7db620d..99d257b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -17,9 +17,36 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -43,6 +70,8 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -59,52 +88,22 @@ import 
org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
+
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
-import org.jboss.netty.channel.local.LocalClientChannelFactory;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
+import org.apache.commons.lang.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistry;
 import java.net.SocketAddress;
 
 import org.apache.bookkeeper.auth.BookKeeperPrincipal;
-import org.jboss.netty.channel.ChannelFactory;
 import org.apache.bookkeeper.client.ClientConnectionPeer;
 
 /**
@@ -112,7 +111,8 @@ import org.apache.bookkeeper.client.ClientConnectionPeer;
  * has reconnect logic if a connection to a bookie fails.
  *
  */
-public class PerChannelBookieClient extends SimpleChannelHandler implements 
ChannelPipelineFactory {
+@Sharable
+public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     static final Logger LOG = 
LoggerFactory.getLogger(PerChannelBookieClient.class);
 
@@ -129,7 +129,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     public static final AtomicLong txnIdGenerator = new AtomicLong(0);
 
     final BookieSocketAddress addr;
-    final ChannelFactory channelFactory;
+    final EventLoopGroup eventLoopGroup;
     final OrderedSafeExecutor executor;
     final HashedWheelTimer requestTimer;
     final int addEntryTimeout;
@@ -175,21 +175,22 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     private final ClientAuthProvider.Factory authProviderFactory;
     private final ExtensionRegistry extRegistry;
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, 
ClientSocketChannelFactory channelFactory,
+    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup 
eventLoopGroup,
                                   BookieSocketAddress addr) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, null, 
NullStatsLogger.INSTANCE, null, null, null);
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE, null, null,
+                null);
     }
 
-    public PerChannelBookieClient(OrderedSafeExecutor executor, 
ClientSocketChannelFactory channelFactory,
+    public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup 
eventLoopGroup,
                                   BookieSocketAddress addr,
                                   ClientAuthProvider.Factory 
authProviderFactory,
                                   ExtensionRegistry extRegistry) {
-        this(new ClientConfiguration(), executor, channelFactory, addr, null, 
NullStatsLogger.INSTANCE,
+        this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, 
NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, 
OrderedSafeExecutor executor,
-                                  ClientSocketChannelFactory channelFactory, 
BookieSocketAddress addr,
+            EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
                                   HashedWheelTimer requestTimer, StatsLogger 
parentStatsLogger,
                                   ClientAuthProvider.Factory 
authProviderFactory,
                                   ExtensionRegistry extRegistry,
@@ -198,10 +199,10 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         this.conf = conf;
         this.addr = addr;
         this.executor = executor;
-        if (LocalBookiesRegistry.isLocalBookie(addr)){
-            this.channelFactory = new DefaultLocalClientChannelFactory();
+        if (LocalBookiesRegistry.isLocalBookie(addr)) {
+            this.eventLoopGroup = new DefaultEventLoopGroup();
         } else {
-            this.channelFactory = channelFactory;
+            this.eventLoopGroup = eventLoopGroup;
         }
         this.state = ConnectionState.DISCONNECTED;
         this.requestTimer = requestTimer;
@@ -239,7 +240,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             public SocketAddress getRemoteAddr() {
                 Channel c = channel;
                 if (c != null) {
-                    return c.getRemoteAddress();
+                    return c.remoteAddress();
                 } else {
                     return null;
                 }
@@ -287,56 +288,94 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         }
     }
 
-    private void connect() {
+    protected ChannelFuture connect() {
         LOG.debug("Connecting to bookie: {}", addr);
 
-        // Set up the ClientBootStrap so we can create a new Channel connection
-        // to the bookie.
-        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-        bootstrap.setPipelineFactory(this);
-        bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay());
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("connectTimeoutMillis", 
conf.getClientConnectTimeoutMillis());
-        bootstrap.setOption("child.sendBufferSize", 
conf.getClientSendBufferSize());
-        bootstrap.setOption("child.receiveBufferSize", 
conf.getClientReceiveBufferSize());
-        bootstrap.setOption("writeBufferLowWaterMark", 
conf.getClientWriteBufferLowWaterMark());
-        bootstrap.setOption("writeBufferHighWaterMark", 
conf.getClientWriteBufferHighWaterMark());
-        SocketAddress bookieAddr = addr.getSocketAddress();        
-        if (channelFactory instanceof LocalClientChannelFactory) {
+        // Set up the ClientBootStrap so we can create a new Channel 
connection to the bookie.
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(eventLoopGroup);
+        if (eventLoopGroup instanceof EpollEventLoopGroup) {
+            bootstrap.channel(EpollSocketChannel.class);
+        } else if (eventLoopGroup instanceof DefaultEventLoopGroup) {
+            bootstrap.channel(LocalChannel.class);
+        } else {
+            bootstrap.channel(NioSocketChannel.class);
+        }
+
+        bootstrap.option(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.getClientConnectTimeoutMillis());
+        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(
+                conf.getClientWriteBufferLowWaterMark(), 
conf.getClientWriteBufferHighWaterMark()));
+
+        if (!(eventLoopGroup instanceof DefaultEventLoopGroup)) {
+            bootstrap.option(ChannelOption.TCP_NODELAY, 
conf.getClientTcpNoDelay());
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, 
conf.getClientSockKeepalive());
+
+            // if buffer sizes are 0, let OS auto-tune it
+            if (conf.getClientSendBufferSize() > 0) {
+                bootstrap.option(ChannelOption.SO_SNDBUF, 
conf.getClientSendBufferSize());
+            }
+
+            if (conf.getClientReceiveBufferSize() > 0) {
+                bootstrap.option(ChannelOption.SO_RCVBUF, 
conf.getClientReceiveBufferSize());
+            }
+        }
+
+        // In the netty pipeline, we need to split packets based on length, so 
we
+        // use the {@link LengthFieldBasedFramDecoder}. Other than that all 
actions
+        // are carried out in this class, e.g., making sense of received 
messages,
+        // prepending the length to outgoing packets etc.
+        bootstrap.handler(new ChannelInitializer<Channel>() {
+            @Override
+            protected void initChannel(Channel ch) throws Exception {
+                ChannelPipeline pipeline = ch.pipeline();
+
+                pipeline.addLast("lengthbasedframedecoder",
+                        new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 
0, 4));
+                pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
+                pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.RequestEncoder(extRegistry));
+                pipeline.addLast("bookieProtoDecoder", new 
BookieProtoEncoding.ResponseDecoder(extRegistry));
+                pipeline.addLast("authHandler", new 
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator, 
connectionPeer));
+                pipeline.addLast("mainhandler", PerChannelBookieClient.this);
+            }
+        });
+
+        SocketAddress bookieAddr = addr.getSocketAddress();
+        if (eventLoopGroup instanceof DefaultEventLoopGroup) {
             bookieAddr = addr.getLocalAddress();
         }
+
         ChannelFuture future = bootstrap.connect(bookieAddr);
         future.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture future) throws 
Exception {
-                LOG.debug("Channel connected ({}) {}", future.isSuccess(), 
future.getChannel());
+                LOG.debug("Channel connected ({}) {}", future.isSuccess(), 
future.channel());
                 int rc;
                 Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
 
                 synchronized (PerChannelBookieClient.this) {
                     if (future.isSuccess() && state == 
ConnectionState.CONNECTING) {
-                        LOG.info("Successfully connected to bookie: {}", 
future.getChannel());
+                        LOG.info("Successfully connected to bookie: {}", 
future.channel());
                         rc = BKException.Code.OK;
-                        channel = future.getChannel();
+                        channel = future.channel();
                         state = ConnectionState.CONNECTED;
                     } else if (future.isSuccess() && (state == 
ConnectionState.CLOSED
                                                       || state == 
ConnectionState.DISCONNECTED)) {
                         LOG.warn("Closed before connection completed, clean 
up: {}, current state {}",
-                                 future.getChannel(), state);
-                        closeChannel(future.getChannel());
+                                future.channel(), state);
+                        closeChannel(future.channel());
                         rc = 
BKException.Code.BookieHandleNotAvailableException;
                         channel = null;
                     } else if (future.isSuccess() && state == 
ConnectionState.CONNECTED) {
                         LOG.debug("Already connected with another channel({}), 
so close the new channel({})",
-                                  channel, future.getChannel());
-                        closeChannel(future.getChannel());
+                                channel, future.channel());
+                        closeChannel(future.channel());
                         return; // pendingOps should have been completed when 
other channel connected
                     } else {
                         LOG.error("Could not connect to bookie: {}/{}, current 
state {} : ",
-                                  new Object[] { future.getChannel(), addr,
-                                                 state, future.getCause() });
+                                new Object[] { future.channel(), addr, state, 
future.cause() });
                         rc = 
BKException.Code.BookieHandleNotAvailableException;
-                        closeChannel(future.getChannel());
+                        closeChannel(future.channel());
                         channel = null;
                         if (state != ConnectionState.CLOSED) {
                             state = ConnectionState.DISCONNECTED;
@@ -356,6 +395,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 }
             }
         });
+
+        return future;
     }
 
     void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
@@ -402,7 +443,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
 
     }
 
-    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, 
ChannelBuffer toSend, WriteLacCallback cb, Object ctx) {
+    void writeLac(final long ledgerId, final byte[] masterKey, final long lac, 
ByteBuf toSend, WriteLacCallback cb,
+            Object ctx) {
         final long txnId = getTxnId();
         final CompletionKey completionKey = new V3CompletionKey(txnId, 
OperationType.WRITE_LAC);
         // writeLac is mostly like addEntry hence uses addEntryTimeout
@@ -418,7 +460,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 .setLedgerId(ledgerId)
                 .setLac(lac)
                 .setMasterKey(ByteString.copyFrom(masterKey))
-                .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+                .setBody(ByteString.copyFrom(toSend.nioBuffer()));
 
         final Request writeLacRequest = Request.newBuilder()
                 .setHeader(headerBuilder)
@@ -431,19 +473,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             return;
         }
         try {
-            ChannelFuture future = c.write(writeLacRequest);
+            ChannelFuture future = c.writeAndFlush(writeLacRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Successfully wrote request for writeLac 
LedgerId: {} bookie: {}",
-                                    ledgerId, c.getRemoteAddress());
+                                    ledgerId, c.remoteAddress());
                         }
                     } else {
-                        if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                        if (!(future.cause() instanceof 
ClosedChannelException)) {
                             LOG.warn("Writing Lac(lid={} to channel {} failed 
: ",
-                                    new Object[] { ledgerId, c, 
future.getCause() });
+                                    new Object[] { ledgerId, c, future.cause() 
});
                         }
                         errorOutWriteLacKey(completionKey);
                     }
@@ -474,7 +516,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
      * @param options
      *          Add options
      */
-    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ChannelBuffer toSend, WriteCallback cb,
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBuf toSend, WriteCallback cb,
                   Object ctx, final int options) {
         Object request = null;
         CompletionKey completion = null;
@@ -482,8 +524,6 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             completion = new V2CompletionKey(ledgerId, entryId, 
OperationType.ADD_ENTRY);
             request = new 
BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 
entryId,
                     (short) options, masterKey, toSend);
-
-
         } else {
             final long txnId = getTxnId();
             completion = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
@@ -493,15 +533,18 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                     .setOperation(OperationType.ADD_ENTRY)
                     .setTxnId(txnId);
 
+            byte[] toSendArray = new byte[toSend.readableBytes()];
+            toSend.getBytes(toSend.readerIndex(), toSendArray);
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
                     .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+                    .setBody(ByteString.copyFrom(toSendArray));
 
             if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == 
BookieProtocol.FLAG_RECOVERY_ADD) {
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
             }
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setAddRequest(addBuilder)
@@ -519,23 +562,24 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         final Channel c = channel;
         if (c == null) {
             errorOutAddKey(completionKey);
+            toSend.release();
             return;
         }
         try {
-            ChannelFuture future = c.write(addRequest);
+            ChannelFuture future = c.writeAndFlush(addRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Successfully wrote request for adding 
entry: " + entryId + " ledger-id: " + ledgerId
-                                                            + " bookie: " + 
c.getRemoteAddress() + " entry length: " + entrySize);
+                                                            + " bookie: " + 
c.remoteAddress() + " entry length: " + entrySize);
                         }
                         // totalBytesOutstanding.addAndGet(entrySize);
                     } else {
-                        if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                        if (!(future.cause() instanceof 
ClosedChannelException)) {
                             LOG.warn("Writing addEntry(lid={}, eid={}) to 
channel {} failed : ",
-                                    new Object[] { ledgerId, entryId, c, 
future.getCause() });
+                                    new Object[] { ledgerId, entryId, c, 
future.cause() });
                         }
                         errorOutAddKey(completionKey);
                     }
@@ -594,19 +638,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
 
         final Object readRequest = request;
         try {
-            ChannelFuture future = c.write(readRequest);
+            ChannelFuture future = c.writeAndFlush(readRequest);
             future.addListener(new ChannelFutureListener() {
                     @Override
                     public void operationComplete(ChannelFuture future) throws 
Exception {
                         if (future.isSuccess()) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Successfully wrote request {} to 
{}",
-                                          readRequest, c.getRemoteAddress());
+                                          readRequest, c.remoteAddress());
                             }
                         } else {
-                            if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                            if (!(future.cause() instanceof 
ClosedChannelException)) {
                                 LOG.warn("Writing 
readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ",
-                                        new Object[] { ledgerId, entryId, c, 
future.getCause() });
+                                        new Object[] { ledgerId, entryId, c, 
future.cause() });
                             }
                             errorOutReadKey(completionKey);
                         }
@@ -623,8 +667,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         CompletionKey completion = null;
         if (useV2WireProtocol) {
             request = new 
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                    ledgerId, (long) 0, (short) 0);
-            completion = new V2CompletionKey(ledgerId, (long) 0, 
OperationType.READ_LAC);
+                    ledgerId, 0, (short) 0);
+            completion = new V2CompletionKey(ledgerId, 0, 
OperationType.READ_LAC);
         } else {
             final long txnId = getTxnId();
             completion = new V3CompletionKey(txnId, OperationType.READ_LAC);
@@ -654,17 +698,17 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         }
 
         try {
-            ChannelFuture future = c.write(readLacRequest);
+            ChannelFuture future = c.writeAndFlush(readLacRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
                         LOG.debug("Succssfully wrote request {} to {}",
-                                    readLacRequest, c.getRemoteAddress());
+                                readLacRequest, c.remoteAddress());
                     } else {
-                        if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                        if (!(future.cause() instanceof 
ClosedChannelException)) {
                             LOG.warn("Writing readLac(lid = {}) to channel {} 
failed : ",
-                                    new Object[] { ledgerId, c, 
future.getCause() });
+                                    new Object[] { ledgerId, c, future.cause() 
});
                         }
                         errorOutReadLacKey(completionKey);
                     }
@@ -715,19 +759,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         }
 
         try{
-            ChannelFuture future = c.write(readRequest);
+            ChannelFuture future = c.writeAndFlush(readRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Successfully wrote request {} to {}",
-                                      readRequest, c.getRemoteAddress());
+                                      readRequest, c.remoteAddress());
                         }
                     } else {
-                        if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                        if (!(future.cause() instanceof 
ClosedChannelException)) {
                             LOG.warn("Writing readEntry(lid={}, eid={}) to 
channel {} failed : ",
-                                    new Object[] { ledgerId, entryId, c, 
future.getCause() });
+                                    new Object[] { ledgerId, entryId, c, 
future.cause() });
                         }
                         errorOutReadKey(completionKey);
                     }
@@ -767,19 +811,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         }
 
         try{
-            ChannelFuture future = c.write(getBookieInfoRequest);
+            ChannelFuture future = c.writeAndFlush(getBookieInfoRequest);
             future.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws 
Exception {
                     if (future.isSuccess()) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Successfully wrote request {} to {}",
-                                    getBookieInfoRequest, 
c.getRemoteAddress());
+                                    getBookieInfoRequest, c.remoteAddress());
                         }
                     } else {
-                        if (!(future.getCause() instanceof 
ClosedChannelException)) {
+                        if (!(future.cause() instanceof 
ClosedChannelException)) {
                             LOG.warn("Writing GetBookieInfoRequest(flags={}) 
to channel {} failed : ",
-                                    new Object[] { requested, c, 
future.getCause() });
+                                    new Object[] { requested, c, 
future.cause() });
                         }
                         errorOutReadKey(completionKey);
                     }
@@ -846,10 +890,6 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
 
     private ChannelFuture closeChannel(Channel c) {
         LOG.debug("Closing channel {}", c);
-        ReadTimeoutHandler timeout = 
c.getPipeline().get(ReadTimeoutHandler.class);
-        if (timeout != null) {
-            timeout.releaseExternalResources();
-        }
         return c.close();
     }
 
@@ -868,8 +908,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             public void safeRun() {
                 String bAddress = "null";
                 Channel c = channel;
-                if (c != null && c.getRemoteAddress() != null) {
-                    bAddress = c.getRemoteAddress().toString();
+                if (c != null && c.remoteAddress() != null) {
+                    bAddress = c.remoteAddress().toString();
                 }
 
                 LOG.debug("Could not write request for reading entry: {} 
ledger-id: {} bookie: {} rc: {}",
@@ -902,7 +942,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 String bAddress = "null";
                 Channel c = channel;
                 if (c != null) {
-                    bAddress = c.getRemoteAddress().toString();
+                    bAddress = c.remoteAddress().toString();
                 }
                 LOG.debug("Could not write request writeLac for ledgerId: {} 
bookie: {}",
                           new Object[] { writeLacCompletion.ledgerId, 
bAddress});
@@ -927,7 +967,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 String bAddress = "null";
                 Channel c = channel;
                 if (c != null) {
-                    bAddress = c.getRemoteAddress().toString();
+                    bAddress = c.remoteAddress().toString();
                 }
                 LOG.debug("Could not write request readLac for ledgerId: {} 
bookie: {}",
                           new Object[] { readLacCompletion.ledgerId, 
bAddress});
@@ -950,8 +990,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             public void safeRun() {
                 String bAddress = "null";
                 Channel c = channel;
-                if(c != null && c.getRemoteAddress() != null) {
-                    bAddress = c.getRemoteAddress().toString();
+                if (c != null && c.remoteAddress() != null) {
+                    bAddress = c.remoteAddress().toString();
                 }
                 LOG.debug("Could not write request for adding entry: {} 
ledger-id: {} bookie: {} rc: {}",
                           new Object[] { addCompletion.entryId, 
addCompletion.ledgerId, bAddress, rc });
@@ -983,7 +1023,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 String bAddress = "null";
                 Channel c = channel;
                 if (c != null) {
-                    bAddress = c.getRemoteAddress().toString();
+                    bAddress = c.remoteAddress().toString();
                 }
                 LOG.debug("Could not write getBookieInfo request for bookie: 
{}", new Object[] {bAddress});
                 getBookieInfoCompletion.cb.getBookieInfoComplete(rc, new 
BookieInfo(), getBookieInfoCompletion.ctx);
@@ -1027,40 +1067,19 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
     }
 
     /**
-     * In the netty pipeline, we need to split packets based on length, so we
-     * use the {@link LengthFieldBasedFrameDecoder}. Other than that all 
actions
-     * are carried out in this class, e.g., making sense of received messages,
-     * prepending the length to outgoing packets etc.
-     */
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        pipeline.addLast("lengthbasedframedecoder", new 
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.RequestEncoder(extRegistry));
-        pipeline.addLast("bookieProtoDecoder", new 
BookieProtoEncoding.ResponseDecoder(extRegistry));
-        pipeline.addLast("authHandler", new 
AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
-            connectionPeer));
-        pipeline.addLast("mainhandler", this);
-        return pipeline;
-    }
-
-    /**
      * If our channel has disconnected, we just error out the pending entries
      */
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, 
ChannelStateEvent e) throws Exception {
-        Channel c = ctx.getChannel();
-        LOG.info("Disconnected from bookie channel {}", c);
-        if (c != null) {
-            closeChannel(c);
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        LOG.info("Disconnected from bookie channel {}", ctx.channel());
+        if (ctx.channel() != null) {
+            closeChannel(ctx.channel());
         }
 
         
errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
 
         synchronized (this) {
-            if (this.channel == c
+            if (this.channel == ctx.channel()
                 && state != ConnectionState.CLOSED) {
                 state = ConnectionState.DISCONNECTED;
             }
@@ -1075,56 +1094,60 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
      * (mostly due to what we do in the netty threads)
      */
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 
throws Exception {
-        Throwable t = e.getCause();
-        if (t instanceof CorruptedFrameException || t instanceof 
TooLongFrameException) {
-            LOG.error("Corrupted frame received from bookie: {}",
-                      e.getChannel().getRemoteAddress());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+        if (cause instanceof CorruptedFrameException || cause instanceof 
TooLongFrameException) {
+            LOG.error("Corrupted frame received from bookie: {}", 
ctx.channel().remoteAddress());
+            ctx.close();
             return;
         }
 
-        if (t instanceof AuthHandler.AuthenticationException) {
-            LOG.error("Error authenticating connection", t);
+        if (cause instanceof AuthHandler.AuthenticationException) {
+            LOG.error("Error authenticating connection", cause);
             
errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException);
-            Channel c = ctx.getChannel();
+            Channel c = ctx.channel();
             if (c != null) {
                 closeChannel(c);
             }
             return;
         }
 
-        if (t instanceof IOException) {
+        if (cause instanceof IOException) {
             // these are thrown when a bookie fails, logging them just pollutes
             // the logs (the failure is logged from the listeners on the write
             // operation), so I'll just ignore it here.
+            ctx.close();
             return;
         }
 
         synchronized (this) {
             if (state == ConnectionState.CLOSED) {
-                LOG.debug("Unexpected exception caught by bookie client 
channel handler, "
-                          + "but the client is closed, so it isn't important", 
t);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Unexpected exception caught by bookie client 
channel handler, "
+                            + "but the client is closed, so it isn't 
important", cause);
+                }
             } else {
-                LOG.error("Unexpected exception caught by bookie client 
channel handler", t);
+                LOG.error("Unexpected exception caught by bookie client 
channel handler", cause);
             }
         }
+
         // Since we are a library, cant terminate App here, can we?
+        ctx.close();
     }
 
     /**
      * Called by netty when a message is received on a channel
      */
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
throws Exception {
+       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
 
-        if (e.getMessage() instanceof BookieProtocol.Response) {
-            BookieProtocol.Response response = (BookieProtocol.Response) 
e.getMessage();
+        if (msg instanceof BookieProtocol.Response) {
+            BookieProtocol.Response response = (BookieProtocol.Response) msg;
             readV2Response(response);
-        } else if (e.getMessage() instanceof Response) {
-            Response response = (Response) e.getMessage();
+        } else if (msg instanceof Response) {
+            Response response = (Response) msg;
             readV3Response(response);
         } else {
-            ctx.sendUpstream(e);
+               ctx.fireChannelRead(msg);
         }
     }
 
@@ -1156,7 +1179,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                         }
                         case READ_ENTRY: {
                             BookieProtocol.ReadResponse readResponse = 
(BookieProtocol.ReadResponse) response;
-                            ChannelBuffer data = null;
+                            ByteBuf data = null;
                             if (readResponse.hasData()) {
                               data = readResponse.getData();
                             }
@@ -1244,9 +1267,9 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                         case READ_ENTRY: {
                             ReadResponse readResponse = 
response.getReadResponse();
                             StatusCode status = response.getStatus() == 
StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
-                            ChannelBuffer buffer = ChannelBuffers.buffer(0);
+                            ByteBuf buffer = Unpooled.EMPTY_BUFFER;
                             if (readResponse.hasBody()) {
-                                buffer = 
ChannelBuffers.copiedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
+                                buffer = 
Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
                             }
                             handleReadResponse(readResponse.getLedgerId(), 
readResponse.getEntryId(), status, buffer, completionValue);
                             break;
@@ -1259,17 +1282,17 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                         }
                         case READ_LAC: {
                             ReadLacResponse readLacResponse = 
response.getReadLacResponse();
-                            ChannelBuffer lacBuffer = ChannelBuffers.buffer(0);
-                            ChannelBuffer lastEntryBuffer = 
ChannelBuffers.buffer(0);
+                            ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
+                            ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
                             StatusCode status = response.getStatus() == 
StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
                             // Thread.dumpStack();
 
                             if (readLacResponse.hasLacBody()) {
-                                lacBuffer = 
ChannelBuffers.copiedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
+                                lacBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
                             }
 
                             if (readLacResponse.hasLastEntryBody()) {
-                                lastEntryBuffer = 
ChannelBuffers.copiedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
+                                lastEntryBuffer = 
Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
                             }
                             
handleReadLacResponse(readLacResponse.getLedgerId(), status, lacBuffer, 
lastEntryBuffer, completionValue);
                             break;
@@ -1335,7 +1358,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
     }
 
-    void handleReadLacResponse(long ledgerId, StatusCode status, ChannelBuffer 
lacBuffer, ChannelBuffer lastEntryBuffer, CompletionValue completionValue) {
+    void handleReadLacResponse(long ledgerId, StatusCode status, ByteBuf 
lacBuffer, ByteBuf lastEntryBuffer, CompletionValue completionValue) {
         // The completion value should always be an instance of an 
WriteLacCompletion object when we reach here.
         ReadLacCompletion glac = (ReadLacCompletion)completionValue;
 
@@ -1350,11 +1373,10 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
         glac.cb.readLacComplete(rcToRet, ledgerId, lacBuffer.slice(), 
lastEntryBuffer.slice(), glac.ctx);
     }
 
-    void handleReadResponse(long ledgerId, long entryId, StatusCode status, 
ChannelBuffer buffer, CompletionValue completionValue) {
+    void handleReadResponse(long ledgerId, long entryId, StatusCode status, 
ByteBuf buffer, CompletionValue completionValue) {
         // The completion value should always be an instance of a 
ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
 
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Got response for read request from bookie: " + addr + " 
for ledger: " + ledgerId + " entry: "
                     + entryId + " rc: " + rc + " entry length: " + 
buffer.readableBytes());
@@ -1466,7 +1488,8 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             final long startTime = MathUtils.nowInNano();
             this.cb = null == readLacOpLogger ? originalCallback : new 
ReadLacCallback() {
                 @Override
-                public void readLacComplete(int rc, long ledgerId, 
ChannelBuffer lacBuffer, ChannelBuffer lastEntryBuffer, Object ctx) {
+                public void readLacComplete(int rc, long ledgerId, ByteBuf 
lacBuffer, ByteBuf lastEntryBuffer,
+                        Object ctx) {
                     cancelTimeout();
                     long latency = MathUtils.elapsedNanos(startTime);
                     if (rc != BKException.Code.OK) {
@@ -1497,7 +1520,7 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
             final long startTime = MathUtils.nowInNano();
             this.cb = new ReadEntryCallback() {
                 @Override
-                public void readEntryComplete(int rc, long ledgerId, long 
entryId, ChannelBuffer buffer, Object ctx) {
+                public void readEntryComplete(int rc, long ledgerId, long 
entryId, ByteBuf buffer, Object ctx) {
                     cancelTimeout();
                     if (readEntryOpLogger != null) {
                         long latency = MathUtils.elapsedNanos(startTime);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index 4a735f9..b7618e3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -38,7 +38,7 @@ interface PerChannelBookieClientPool {
      * @param callback
      *          callback to return channel from channel pool.
      */
-    void obtain(GenericCallback<PerChannelBookieClient> callback);
+    void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
 
     /**
      * record any read/write error on {@link PerChannelBookieClientPool}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 43360fa..bd98374 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.channel.Channel;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
@@ -28,7 +30,6 @@ import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +53,7 @@ class ReadEntryProcessor extends PacketProcessorBase {
         try {
             Future<Boolean> fenceResult = null;
             if (read.isFencingRequest()) {
-                LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + 
channel.getRemoteAddress());
+                LOG.warn("Ledger: {}  fenced by: {}", request.getLedgerId(), 
channel.remoteAddress());
 
                 if (read.hasMasterKey()) {
                     fenceResult = 
requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index b9037c1..fbfa71f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.channel.Channel;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +34,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,14 +70,14 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
             Future<Boolean> fenceResult = null;
             if (readRequest.hasFlag() && 
readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
                 LOG.warn("Ledger fence request received for ledger: {} from 
address: {}", ledgerId,
-                         channel.getRemoteAddress());
+                         channel.remoteAddress());
 
                 if (readRequest.hasMasterKey()) {
                     byte[] masterKey = 
readRequest.getMasterKey().toByteArray();
                     fenceResult = 
requestProcessor.bookie.fenceLedger(ledgerId, masterKey);
                 } else {
                     LOG.error("Fence ledger request received without master 
key for ledger:{} from address: {}",
-                              ledgerId, channel.getRemoteAddress());
+                              ledgerId, channel.remoteAddress());
                     throw 
BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
@@ -129,7 +130,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
             LOG.error("IOException while reading entry:{} from ledger:{}", 
entryId, ledgerId);
         } catch (BookieException e) {
             LOG.error("Unauthorized access to ledger:{} while reading entry:{} 
in request from address: {}",
-                    new Object[]{ledgerId, entryId, 
channel.getRemoteAddress()});
+                    new Object[]{ledgerId, entryId, channel.remoteAddress()});
             status = StatusCode.EUA;
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index e9a4c13..0fbdbec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -31,13 +31,14 @@ import 
org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
 
-class ReadLacProcessorV3 extends PacketProcessorBaseV3 {
+import io.netty.channel.Channel;
+
+class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private final static Logger logger = 
LoggerFactory.getLogger(ReadLacProcessorV3.class);
 
     public ReadLacProcessorV3(Request request, Channel channel,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index 651c118..1418437 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -20,8 +20,9 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.buffer.Unpooled;
+
 import java.nio.ByteBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
 
 class ResponseBuilder {
     static BookieProtocol.Response buildErrorResponse(int errorCode, 
BookieProtocol.Request r) {
@@ -42,6 +43,6 @@ class ResponseBuilder {
 
     static BookieProtocol.Response buildReadResponse(ByteBuffer data, 
BookieProtocol.Request r) {
         return new BookieProtocol.ReadResponse(r.getProtocolVersion(), 
BookieProtocol.EOK,
-                r.getLedgerId(), r.getEntryId(), 
ChannelBuffers.wrappedBuffer(data));
+                r.getLedgerId(), r.getEntryId(), Unpooled.wrappedBuffer(data));
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
deleted file mode 100644
index ab19ce0..0000000
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/VMLocalChannelManager.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.proto;
-
-import java.io.IOException;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
-
-/**
- * Manages VM-local channels
- *
- * @author enrico.olivelli
- */
-public class VMLocalChannelManager extends ChannelManager {
-
-    private ChannelFactory channelFactory;
-    private BookieSocketAddress bookieAddress;
-
-    @Override
-    public Channel start(ServerConfiguration conf, ChannelPipelineFactory 
bookiePipelineFactory) throws IOException {
-        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
-        this.channelFactory = new DefaultLocalServerChannelFactory();
-        this.bookieAddress = bookieAddress;
-        ServerBootstrap jvmbootstrap = new ServerBootstrap(channelFactory);
-        jvmbootstrap.setPipelineFactory(bookiePipelineFactory);
-
-        // use the same address 'name', so clients can find local Bookie still 
discovering them using ZK
-        Channel jvmlisten = jvmbootstrap.bind(bookieAddress.getLocalAddress());
-        LocalBookiesRegistry.registerLocalBookieAddress(bookieAddress);
-        return jvmlisten;
-    }
-
-    @Override
-    public void close() {
-        LocalBookiesRegistry.unregisterLocalBookieAddress(bookieAddress);
-        if (channelFactory != null) {
-            channelFactory.releaseExternalResources();
-        }
-        channelFactory = null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index b314998..46f7f7d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.channel.Channel;
+
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
@@ -25,7 +27,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,10 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
         } catch (BookieException e) {
             LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
             rc = BookieProtocol.EUA;
+        } finally {
+            add.release();
         }
+
         if (rc != BookieProtocol.EOK) {
             
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 242ed81..e34e894 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.channel.Channel;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
@@ -32,7 +34,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 104f561..097a573 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -31,11 +31,12 @@ import 
org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class WriteLacProcessorV3 extends PacketProcessorBaseV3 {
+import io.netty.channel.Channel;
+
+class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     private final static Logger logger = 
LoggerFactory.getLogger(WriteLacProcessorV3.class);
 
     public WriteLacProcessorV3(Request request, Channel channel,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
new file mode 100644
index 0000000..3b4c83a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DoubleByteBuf.java
@@ -0,0 +1,468 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.AbstractReferenceCountedByteBuf;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetectorFactory;
+import io.netty.util.ResourceLeakTracker;
+
+/**
+ * ByteBuf that holds 2 buffers. Similar to {@see CompositeByteBuf} but 
doesn't allocate list to hold them.
+ */
+@SuppressWarnings("unchecked")
+public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
+
+    private ByteBuf b1;
+    private ByteBuf b2;
+    private final Handle<DoubleByteBuf> recyclerHandle;
+
+    private static final Recycler<DoubleByteBuf> RECYCLER = new 
Recycler<DoubleByteBuf>() {
+        @Override
+        protected DoubleByteBuf newObject(Recycler.Handle<DoubleByteBuf> 
handle) {
+            return new DoubleByteBuf(handle);
+        }
+    };
+
+    private DoubleByteBuf(Handle<DoubleByteBuf> recyclerHandle) {
+        super(Integer.MAX_VALUE);
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static ByteBuf get(ByteBuf b1, ByteBuf b2) {
+        DoubleByteBuf buf = RECYCLER.get();
+        buf.setRefCnt(1);
+
+        // Make sure the buffers are not deallocated as long as we hold them. 
Also, buffers can get retained/releases
+        // outside of DoubleByteBuf scope
+        buf.b1 = b1.retain();
+        buf.b2 = b2.retain();
+        buf.setIndex(0, b1.readableBytes() + b2.readableBytes());
+        return toLeakAwareBuffer(buf);
+    }
+
+    public ByteBuf getFirst() {
+        return b1;
+    }
+
+    public ByteBuf getSecond() {
+        return b2;
+    }
+
+    @Override
+    public boolean isDirect() {
+        return b1.isDirect() && b2.isDirect();
+    }
+
+    @Override
+    public boolean hasArray() {
+        // There's no single array available
+        return false;
+    }
+
+    @Override
+    public byte[] array() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int arrayOffset() {
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasMemoryAddress() {
+        return false;
+    }
+
+    @Override
+    public long memoryAddress() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int capacity() {
+        return b1.capacity() + b2.capacity();
+    }
+
+    @Override
+    public int readableBytes() {
+        return b1.readableBytes() + b2.readableBytes();
+    }
+
+    @Override
+    public int writableBytes() {
+        return 0;
+    }
+
+    @Override
+    public DoubleByteBuf capacity(int newCapacity) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return PooledByteBufAllocator.DEFAULT;
+    }
+
+    @Override
+    @Deprecated
+    public ByteOrder order() {
+        return ByteOrder.BIG_ENDIAN;
+    }
+
+    @Override
+    public byte getByte(int index) {
+        if (index < b1.writerIndex()) {
+            return b1.getByte(index);
+        } else {
+            return b2.getByte(index - b1.writerIndex());
+        }
+    }
+
+    @Override
+    protected byte _getByte(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected short _getShort(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected short _getShortLE(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected int _getUnsignedMediumLE(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected int _getIntLE(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected long _getLongLE(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setShortLE(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setMediumLE(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setIntLE(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setLongLE(int index, long value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getBytes(int index, FileChannel out, long position, int length) 
throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int setBytes(int index, FileChannel in, long position, int length) 
throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected int _getUnsignedMedium(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected int _getInt(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected long _getLong(int index) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf getBytes(int index, byte[] dst, int dstIndex, int 
length) {
+        return getBytes(index, Unpooled.wrappedBuffer(dst), dstIndex, length);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int 
length) {
+        checkDstIndex(index, length, dstIndex, dst.capacity());
+        if (length == 0) {
+            return this;
+        }
+
+        int b1Length = Math.min(length, b1.readableBytes() - index);
+        if (b1Length > 0) {
+            b1.getBytes(b1.readerIndex() + index, dst, dstIndex, b1Length);
+            dstIndex += b1Length;
+            length -= b1Length;
+            index = 0;
+        } else {
+            index -= b1.readableBytes();
+        }
+
+        if (length > 0) {
+            int b2Length = Math.min(length, b2.readableBytes() - index);
+            b2.getBytes(b2.readerIndex() + index, dst, dstIndex, b2Length);
+        }
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length) 
throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf getBytes(int index, OutputStream out, int length) 
throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setByte(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setByte(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setShort(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setShort(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setMedium(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setMedium(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setInt(int index, int value) {
+        return (DoubleByteBuf) super.setInt(index, value);
+    }
+
+    @Override
+    protected void _setInt(int index, int value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setLong(int index, long value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void _setLong(int index, long value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setBytes(int index, byte[] src, int srcIndex, int 
length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setBytes(int index, ByteBuffer src) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DoubleByteBuf setBytes(int index, ByteBuf src, int srcIndex, int 
length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws 
IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length) 
throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return b1.nioBufferCount() + b2.nioBufferCount();
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(int index, int length) {
+        ByteBuffer dst = ByteBuffer.allocate(length);
+        ByteBuf b = Unpooled.wrappedBuffer(dst);
+        b.writerIndex(0);
+        getBytes(index, b, length);
+        return dst;
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return new ByteBuffer[] { nioBuffer(index, length) };
+    }
+
+    @Override
+    public DoubleByteBuf discardReadBytes() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString() {
+        String result = super.toString();
+        result = result.substring(0, result.length() - 1);
+        return result + ", components=2)";
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers() {
+        return nioBuffers(readerIndex(), readableBytes());
+    }
+
+    @Override
+    protected void deallocate() {
+        // Double release of buffer for the initial ref-count and the internal 
retain() when the DoubleByteBuf was
+        // created
+        b1.release(2);
+        b2.release(2);
+        b1 = b2 = null;
+        recyclerHandle.recycle(this);
+    }
+
+    @Override
+    public ByteBuf unwrap() {
+        return null;
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(DoubleByteBuf.class);
+
+    private static final ResourceLeakDetector<DoubleByteBuf> leakDetector = 
ResourceLeakDetectorFactory.instance()
+            .newResourceLeakDetector(DoubleByteBuf.class);
+    private static final Constructor<ByteBuf> 
simpleLeakAwareByteBufConstructor;
+    private static final Constructor<ByteBuf> 
advancedLeakAwareByteBufConstructor;
+
+    static {
+        Constructor<ByteBuf> _simpleLeakAwareByteBufConstructor = null;
+        Constructor<ByteBuf> _advancedLeakAwareByteBufConstructor = null;
+        try {
+            Class<?> simpleLeakAwareByteBufClass = 
Class.forName("io.netty.buffer.SimpleLeakAwareByteBuf");
+            _simpleLeakAwareByteBufConstructor = (Constructor<ByteBuf>) 
simpleLeakAwareByteBufClass
+                    .getDeclaredConstructor(ByteBuf.class, 
ResourceLeakTracker.class);
+            _simpleLeakAwareByteBufConstructor.setAccessible(true);
+
+            Class<?> advancedLeakAwareByteBufClass = 
Class.forName("io.netty.buffer.AdvancedLeakAwareByteBuf");
+            _advancedLeakAwareByteBufConstructor = (Constructor<ByteBuf>) 
advancedLeakAwareByteBufClass
+                    .getDeclaredConstructor(ByteBuf.class, 
ResourceLeakTracker.class);
+            _advancedLeakAwareByteBufConstructor.setAccessible(true);
+        } catch (Throwable t) {
+            log.error("Failed to use reflection to enable leak detection", t);
+        } finally {
+            simpleLeakAwareByteBufConstructor = 
_simpleLeakAwareByteBufConstructor;
+            advancedLeakAwareByteBufConstructor = 
_advancedLeakAwareByteBufConstructor;
+        }
+    }
+
+    private static ByteBuf toLeakAwareBuffer(DoubleByteBuf buf) {
+        try {
+            ResourceLeakTracker<DoubleByteBuf> leak;
+            switch (ResourceLeakDetector.getLevel()) {
+            case DISABLED:
+                break;
+
+            case SIMPLE:
+                leak = leakDetector.track(buf);
+                if (leak != null) {
+                    return simpleLeakAwareByteBufConstructor.newInstance(buf, 
leak);
+                }
+                break;
+            case ADVANCED:
+            case PARANOID:
+                leak = leakDetector.track(buf);
+                if (leak != null) {
+                    return 
advancedLeakAwareByteBufConstructor.newInstance(buf, leak);
+                }
+                break;
+            }
+            return buf;
+        } catch (Throwable t) {
+            // Catch reflection exception
+            throw new RuntimeException(t);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 5c028f0..e2b3c47 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -38,7 +38,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.channel.ChannelException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -268,9 +267,11 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
             BookieServer bs2 = new BookieServer(conf);
             bs2.start();
             fail("Should throw BindException, as the bk server is already 
running!");
-        } catch (ChannelException ce) {
-            Assert.assertTrue("Should be caused by a bind exception",
-                              ce.getCause() instanceof BindException);         
   
+        } catch (BindException e) {
+            // Ok
+        } catch (IOException e) {
+            Assert.assertTrue("BKServer allowed duplicate Startups!",
+                    e.getMessage().contains("bind"));
         }
     }
 
@@ -292,7 +293,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
             fail("Should throw ConnectionLossException as ZKServer is not 
running!");
         } catch (KeeperException.ConnectionLossException e) {
             // expected behaviour
-        } 
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 9baa4e7..1dfa32e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
@@ -116,14 +118,14 @@ public class BookieJournalTest {
     /**
      * Generate meta entry with given master key
      */
-    private ByteBuffer generateMetaEntry(long ledgerId, byte[] masterKey) {
+    private ByteBuf generateMetaEntry(long ledgerId, byte[] masterKey) {
         ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
         bb.putLong(ledgerId);
         bb.putLong(Bookie.METAENTRY_ID_LEDGER_KEY);
         bb.putInt(masterKey.length);
         bb.put(masterKey);
         bb.flip();
-        return bb;
+        return Unpooled.wrappedBuffer(bb);
     }
 
     private void writeJunkJournal(File journalDir) throws Exception {
@@ -154,14 +156,15 @@ public class BookieJournalTest {
         byte[] data = "JournalTestData".getBytes();
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
         for (int i = 1; i <= numEntries; i++) {
-            ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data).toByteBuffer();
+            ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data);
             lastConfirmed = i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
             fc.write(lenBuff);
-            fc.write(packet);
+            fc.write(packet.nioBuffer());
+            packet.release();
         }
     }
 
@@ -197,14 +200,15 @@ public class BookieJournalTest {
         Arrays.fill(data, (byte)'X');
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
         for (int i = 1; i <= numEntries; i++) {
-            ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data).toByteBuffer();
+            ByteBuf packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data);
             lastConfirmed = i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
             bc.write(lenBuff);
-            bc.write(packet);
+            bc.write(packet.nioBuffer());
+            packet.release();
         }
         bc.flush(true);
 
@@ -225,19 +229,20 @@ public class BookieJournalTest {
         Arrays.fill(data, (byte)'X');
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
         for (int i = 0; i <= numEntries; i++) {
-            ByteBuffer packet;
+            ByteBuf packet;
             if (i == 0) {
                 packet = generateMetaEntry(1, masterKey);
             } else {
-                packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data).toByteBuffer();
+                packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
i*data.length, data);
             }
             lastConfirmed = i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
             bc.write(lenBuff);
-            bc.write(packet);
+            bc.write(packet.nioBuffer());
+            packet.release();
         }
         bc.flush(true);
 
@@ -258,18 +263,19 @@ public class BookieJournalTest {
         Arrays.fill(data, (byte)'X');
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
         for (int i = 0; i <= numEntries; i++) {
-            ByteBuffer packet;
+            ByteBuf packet;
             if (i == 0) {
                 packet = generateMetaEntry(1, masterKey);
             } else {
-                packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * 
data.length, data).toByteBuffer();
+                packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * 
data.length, data);
             }
             lastConfirmed = i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
             bc.write(lenBuff);
-            bc.write(packet);
+            bc.write(packet.nioBuffer());
+            packet.release();
         }
         // write fence key
         ByteBuffer packet = generateFenceEntry(1);
@@ -296,20 +302,20 @@ public class BookieJournalTest {
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
         long length = 0;
         for (int i = 0; i <= numEntries; i++) {
-            ByteBuffer packet;
+            ByteBuf packet;
             if (i == 0) {
                 packet = generateMetaEntry(1, masterKey);
             } else {
-                packet = ClientUtil.generatePacket(1, i, lastConfirmed,
-                        length, data, 0, i).toByteBuffer();
+                packet = ClientUtil.generatePacket(1, i, lastConfirmed, 
length, data, 0, i);
             }
             lastConfirmed = i;
             length += i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
             bc.write(lenBuff);
-            bc.write(packet);
+            bc.write(packet.nioBuffer());
+            packet.release();
             Journal.writePaddingBytes(jc, paddingBuff, 
JournalChannel.SECTOR_SIZE);
         }
         // write fence key

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index 84af7a7..f47e745 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -88,15 +89,16 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
         long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
 
         for (int i = 1; i <= numEntries; i++) {
-            ByteBuffer packet = ClientUtil.generatePacket(ledgerId, i, 
lastConfirmed,
-                                                          i*data.length, 
data).toByteBuffer();
+            ByteBuf packet = ClientUtil.generatePacket(ledgerId, i, 
lastConfirmed,
+                                                          i*data.length, data);
             lastConfirmed = i;
             ByteBuffer lenBuff = ByteBuffer.allocate(4);
-            lenBuff.putInt(packet.remaining());
+            lenBuff.putInt(packet.readableBytes());
             lenBuff.flip();
 
             bc.write(lenBuff);
-            bc.write(packet);
+            bc.write(packet.nioBuffer());
+            packet.release();
         }
         bc.flush(true);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 17d63b3..9ec835d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -194,7 +194,7 @@ public class BookKeeperTest extends BaseTestCase {
     public void testCloseDuringOp() throws Exception {
         ClientConfiguration conf = new ClientConfiguration()
             .setZkServers(zkUtil.getZooKeeperConnectString());
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 10; i++) {
             final BookKeeper client = new BookKeeper(conf);
             final CountDownLatch l = new CountDownLatch(1);
             final AtomicBoolean success = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index b394dc2..3fc86ad 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -22,23 +22,10 @@ package org.apache.bookkeeper.client;
  */
 
 import java.io.IOException;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
-import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
-import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 /**
  * Test BookKeeperClient which allows access to members we don't

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index d25bd70..ae0a07f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -30,13 +30,14 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -483,7 +484,7 @@ public class BookieRecoveryTest extends 
MultiLedgerManagerMultiDigestTestCase {
         }
 
         @Override
-        public void readEntryComplete(int rc, long ledgerId, long entryId, 
ChannelBuffer buffer, Object ctx) {
+        public void readEntryComplete(int rc, long ledgerId, long entryId, 
ByteBuf buffer, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " 
+ entryId + " from " + ctx);
             }

Reply via email to