BOOKKEEPER-1008: Netty 4.1

Added more ref-count fixes from yahoo-4.3 branch on top of #116

Author: Matteo Merli <[email protected]>
Author: Matteo Merli <[email protected]>
Author: Kishore Kasi Udayashankar <[email protected]>

Reviewers: Jia Zhai <[email protected]>, Sijie Guo <[email protected]>

Closes #138 from merlimat/netty-4.1


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/74f79513
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/74f79513
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/74f79513

Branch: refs/heads/master
Commit: 74f795136c1fff3badb29fc982d0cc2d43096b45
Parents: 811ece5
Author: Matteo Merli <[email protected]>
Authored: Mon May 15 10:52:53 2017 -0700
Committer: Sijie Guo <[email protected]>
Committed: Mon May 15 10:52:53 2017 -0700

----------------------------------------------------------------------
 bookkeeper-benchmark/pom.xml                    |  20 +-
 .../bookkeeper/benchmark/BenchBookie.java       |  38 +-
 bookkeeper-server/pom.xml                       |  46 +-
 .../LocalBookieEnsemblePlacementPolicy.java     |   5 +-
 .../apache/bookkeeper/client/BookKeeper.java    |  85 ++--
 .../bookkeeper/client/CRC32DigestManager.java   |  14 +-
 .../client/DefaultEnsemblePlacementPolicy.java  |   3 +-
 .../apache/bookkeeper/client/DigestManager.java | 116 +++--
 .../client/EnsemblePlacementPolicy.java         |   3 +-
 .../client/ExplicitLacFlushPolicy.java          |   6 +-
 .../apache/bookkeeper/client/LedgerChecker.java |   7 +-
 .../apache/bookkeeper/client/LedgerEntry.java   |   5 +-
 .../client/LedgerFragmentReplicator.java        |   6 +-
 .../apache/bookkeeper/client/LedgerHandle.java  |  31 +-
 .../bookkeeper/client/LedgerHandleAdv.java      |   6 +-
 .../bookkeeper/client/MacDigestManager.java     |  10 +-
 .../apache/bookkeeper/client/PendingAddOp.java  |  34 +-
 .../bookkeeper/client/PendingReadLacOp.java     |   9 +-
 .../apache/bookkeeper/client/PendingReadOp.java |  11 +-
 .../bookkeeper/client/PendingWriteLacOp.java    |   7 +-
 .../RackawareEnsemblePlacementPolicy.java       |   3 +-
 .../RackawareEnsemblePlacementPolicyImpl.java   |   3 +-
 .../bookkeeper/client/ReadLastConfirmedOp.java  |   8 +-
 .../RegionAwareEnsemblePlacementPolicy.java     |   2 +-
 .../client/TryReadLastConfirmedOp.java          |   5 +-
 .../bookkeeper/conf/ClientConfiguration.java    |  24 +
 .../bookkeeper/conf/ServerConfiguration.java    | 113 ++++-
 .../bookkeeper/net/BookieSocketAddress.java     |   3 +-
 .../net/StabilizeNetworkTopology.java           |   7 +-
 .../bookkeeper/processor/RequestProcessor.java  |   2 +-
 .../apache/bookkeeper/proto/AuthHandler.java    | 141 +++---
 .../apache/bookkeeper/proto/BookieClient.java   |  73 +--
 .../bookkeeper/proto/BookieNettyServer.java     | 281 ++++++++---
 .../bookkeeper/proto/BookieProtoEncoding.java   | 186 ++++----
 .../apache/bookkeeper/proto/BookieProtocol.java |  23 +-
 .../bookkeeper/proto/BookieRequestHandler.java  |  55 +--
 .../proto/BookieRequestProcessor.java           |  15 +-
 .../proto/BookkeeperInternalCallbacks.java      |   7 +-
 .../apache/bookkeeper/proto/ChannelManager.java |  46 --
 .../DefaultPerChannelBookieClientPool.java      |   5 +-
 .../proto/GetBookieInfoProcessorV3.java         |   3 +-
 .../proto/NioServerSocketChannelManager.java    |  75 ---
 .../bookkeeper/proto/PacketProcessorBase.java   |   5 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java |   5 +-
 .../proto/PerChannelBookieClient.java           | 359 +++++++-------
 .../proto/PerChannelBookieClientPool.java       |   2 +-
 .../bookkeeper/proto/ReadEntryProcessor.java    |   5 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java  |   9 +-
 .../bookkeeper/proto/ReadLacProcessorV3.java    |   5 +-
 .../bookkeeper/proto/ResponseBuilder.java       |   5 +-
 .../bookkeeper/proto/VMLocalChannelManager.java |  63 ---
 .../bookkeeper/proto/WriteEntryProcessor.java   |   6 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java |   3 +-
 .../bookkeeper/proto/WriteLacProcessorV3.java   |   5 +-
 .../apache/bookkeeper/util/DoubleByteBuf.java   | 468 +++++++++++++++++++
 .../bookie/BookieInitializationTest.java        |  11 +-
 .../bookkeeper/bookie/BookieJournalTest.java    |  48 +-
 .../apache/bookkeeper/bookie/UpgradeTest.java   |  10 +-
 .../bookkeeper/client/BookKeeperTest.java       |   2 +-
 .../bookkeeper/client/BookKeeperTestClient.java |  13 -
 .../bookkeeper/client/BookieRecoveryTest.java   |   5 +-
 .../apache/bookkeeper/client/ClientUtil.java    |   6 +-
 .../bookkeeper/client/SlowBookieTest.java       |   1 +
 .../client/TestGetBookieInfoTimeout.java        |  18 +-
 .../TestRackawareEnsemblePlacementPolicy.java   |   4 +-
 ...awareEnsemblePlacementPolicyUsingScript.java |   3 +-
 .../TestRegionAwareEnsemblePlacementPolicy.java |   4 +-
 .../bookkeeper/proto/NetworkLessBookieTest.java |   8 +-
 .../proto/TestBackwardCompatCMS42.java          |  63 +--
 .../proto/TestPerChannelBookieClient.java       |  51 +-
 .../test/BookKeeperClusterTestCase.java         |   1 +
 .../bookkeeper/test/BookieClientTest.java       |  50 +-
 .../bookkeeper/test/BookieReadWriteTest.java    |   2 +-
 .../apache/bookkeeper/test/LoopbackClient.java  |  19 +-
 .../bookkeeper/util/DoubleByteBufTest.java      | 121 +++++
 pom.xml                                         |   2 +-
 76 files changed, 1865 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-benchmark/pom.xml b/bookkeeper-benchmark/pom.xml
index dac28b0..4354656 100644
--- a/bookkeeper-benchmark/pom.xml
+++ b/bookkeeper-benchmark/pom.xml
@@ -87,10 +87,28 @@
       <version>${zookeeper.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-all</artifactId>
       <version>${netty.version}</version>
       <scope>compile</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 94ffd02..7ccd034 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -20,7 +20,6 @@
 package org.apache.bookkeeper.benchmark;
 
 import java.io.IOException;
-import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -32,21 +31,23 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.cli.ParseException;
 import org.apache.zookeeper.KeeperException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 public class BenchBookie {
     static final Logger LOG = LoggerFactory.getLogger(BenchBookie.class);
 
@@ -137,18 +138,25 @@ public class BenchBookie {
         int size = Integer.parseInt(cmd.getOptionValue("size", "1024"));
         String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
 
+        EventLoopGroup eventLoop;
+        if (SystemUtils.IS_OS_LINUX) {
+            try {
+                eventLoop = new EpollEventLoopGroup();
+            } catch (Throwable t) {
+                LOG.warn("Could not use Netty Epoll event loop for benchmark 
{}", t.getMessage());
+                eventLoop = new NioEventLoopGroup();
+            }
+        } else {
+            eventLoop = new NioEventLoopGroup();
+        }
 
-
-        ClientSocketChannelFactory channelFactory
-            = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
-                                                .newCachedThreadPool());
         OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
                 .name("BenchBookieClientScheduler")
                 .numThreads(1)
                 .build();
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClient(conf, channelFactory, executor);
+        BookieClient bc = new BookieClient(conf, eventLoop, executor);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
@@ -156,7 +164,7 @@ public class BenchBookie {
 
         long ledger = getValidLedgerId(servers);
         for(long entry = 0; entry < warmUpCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(size);
+            ByteBuf toSend = Unpooled.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -173,7 +181,7 @@ public class BenchBookie {
         int entryCount = 5000;
         long startTime = System.nanoTime();
         for(long entry = 0; entry < entryCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(size);
+            ByteBuf toSend = Unpooled.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -194,7 +202,7 @@ public class BenchBookie {
         startTime = System.currentTimeMillis();
         tc = new ThroughputCallback();
         for(long entry = 0; entry < entryCount; entry++) {
-            ChannelBuffer toSend = ChannelBuffers.buffer(size);
+            ByteBuf toSend = Unpooled.buffer(size);
             toSend.resetReaderIndex();
             toSend.resetWriterIndex();
             toSend.writeLong(ledger);
@@ -208,7 +216,7 @@ public class BenchBookie {
         LOG.info("Throughput: " + ((long)entryCount)*1000/(endTime-startTime));
 
         bc.close();
-        channelFactory.releaseExternalResources();
+        eventLoop.shutdownGracefully();
         executor.shutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index be4249f..99705f4 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -23,7 +23,6 @@
     <groupId>org.apache.bookkeeper</groupId>
     <version>4.5.0-SNAPSHOT</version>
   </parent>
-  <groupId>org.apache.bookkeeper</groupId>
   <artifactId>bookkeeper-server</artifactId>
   <name>bookkeeper-server</name>
   <url>http://maven.apache.org</url>
@@ -74,6 +73,22 @@
           <groupId>net.java.dev.javacc</groupId>
           <artifactId>javacc</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+               </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -82,12 +97,24 @@
       <version>${zookeeper.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-      <version>${netty.version}</version>
-      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
@@ -184,6 +211,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 00ac0d0..3969f41 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import java.net.UnknownHostException;
 import java.util.*;
 
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
@@ -32,14 +31,14 @@ import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.commons.configuration.Configuration;
 import com.google.common.collect.Lists;
-import org.jboss.netty.util.HashedWheelTimer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+import io.netty.util.HashedWheelTimer;
+
 /**
  * Special ensemble placement policy that always return local bookie. Only 
works with ledgers with ensemble=1.
  */

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index a42db17..fd25361 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -20,12 +20,18 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -57,11 +63,9 @@ import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +92,7 @@ public class BookKeeper implements AutoCloseable {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
     final ZooKeeper zk;
-    final ClientSocketChannelFactory channelFactory;
+    final EventLoopGroup eventLoopGroup;
 
     // The stats logger for this client.
     private final StatsLogger statsLogger;
@@ -101,9 +105,9 @@ public class BookKeeper implements AutoCloseable {
     private OpStatsLogger readLacOpLogger;
 
 
-    // whether the socket factory is one we created, or is owned by whoever
+    // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
-    boolean ownChannelFactory = false;
+    boolean ownEventLoopGroup = false;
     // whether the zk handle is one we created, or is owned by whoever
     // instantiated us
     boolean ownZKHandle = false;
@@ -138,7 +142,7 @@ public class BookKeeper implements AutoCloseable {
         final ClientConfiguration conf;
 
         ZooKeeper zk = null;
-        ClientSocketChannelFactory channelFactory = null;
+        EventLoopGroup eventLoopGroup = null;
         StatsLogger statsLogger = NullStatsLogger.INSTANCE;
         DNSToSwitchMapping dnsResolver = null;
         HashedWheelTimer requestTimer = null;
@@ -148,8 +152,8 @@ public class BookKeeper implements AutoCloseable {
             this.conf = conf;
         }
 
-        public Builder setChannelFactory(ClientSocketChannelFactory f) {
-            channelFactory = f;
+        public Builder setEventLoopGroup(EventLoopGroup f) {
+            eventLoopGroup = f;
             return this;
         }
 
@@ -181,7 +185,7 @@ public class BookKeeper implements AutoCloseable {
 
         public BookKeeper build() throws IOException, InterruptedException, 
KeeperException {
             Preconditions.checkNotNull(statsLogger, "No stats logger 
provided");
-            return new BookKeeper(conf, zk, channelFactory, statsLogger, 
dnsResolver, requestTimer, featureProvider);
+            return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, 
dnsResolver, requestTimer, featureProvider);
         }
     }
 
@@ -190,7 +194,7 @@ public class BookKeeper implements AutoCloseable {
     }
 
     /**
-     * Create a bookkeeper client. A zookeeper client and a client socket 
factory
+     * Create a bookkeeper client. A zookeeper client and a client event loop 
group
      * will be instantiated as part of this constructor.
      *
      * @param servers
@@ -209,7 +213,7 @@ public class BookKeeper implements AutoCloseable {
 
     /**
      * Create a bookkeeper client using a configuration object.
-     * A zookeeper client and a client socket factory will be
+     * A zookeeper client and a client event loop group will be
      * instantiated as part of this constructor.
      *
      * @param conf
@@ -229,10 +233,10 @@ public class BookKeeper implements AutoCloseable {
         return zk;
     }
 
-    private static ClientSocketChannelFactory 
validateChannelFactory(ClientSocketChannelFactory factory)
+    private static EventLoopGroup validateEventLoopGroup(EventLoopGroup 
eventLoopGroup)
             throws NullPointerException {
-        Preconditions.checkNotNull(factory, "No Channel Factory provided");
-        return factory;
+        Preconditions.checkNotNull(eventLoopGroup, "No Event Loop Group 
provided");
+        return eventLoopGroup;
     }
 
     /**
@@ -257,7 +261,7 @@ public class BookKeeper implements AutoCloseable {
 
     /**
      * Create a bookkeeper client but use the passed in zookeeper client and
-     * client socket channel factory instead of instantiating those.
+     * client event loop group instead of instantiating those.
      *
      * @param conf
      *          Client Configuration Object
@@ -266,15 +270,15 @@ public class BookKeeper implements AutoCloseable {
      *          Zookeeper client instance connected to the zookeeper with which
      *          the bookies have registered. The ZooKeeper client must be 
connected
      *          before it is passed to BookKeeper. Otherwise a KeeperException 
is thrown.
-     * @param channelFactory
-     *          A factory that will be used to create connections to the 
bookies
+     * @param eventLoopGroup
+     *          An event loop group that will be used to create connections to 
the bookies
      * @throws IOException
      * @throws InterruptedException
      * @throws KeeperException if the passed zk handle is not connected
      */
-    public BookKeeper(ClientConfiguration conf, ZooKeeper zk, 
ClientSocketChannelFactory channelFactory)
+    public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup 
eventLoopGroup)
             throws IOException, InterruptedException, KeeperException {
-        this(conf, validateZooKeeper(zk), 
validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE,
+        this(conf, validateZooKeeper(zk), 
validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
                 null, null, null);
     }
 
@@ -283,7 +287,7 @@ public class BookKeeper implements AutoCloseable {
      */
     private BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
-                       ClientSocketChannelFactory channelFactory,
+                       EventLoopGroup eventLoopGroup,
                        StatsLogger statsLogger,
                        DNSToSwitchMapping dnsResolver,
                        HashedWheelTimer requestTimer,
@@ -310,18 +314,13 @@ public class BookKeeper implements AutoCloseable {
             this.ownZKHandle = false;
         }
 
-        // initialize channel factory
-        if (null == channelFactory) {
-            ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-            this.channelFactory = new NioClientSocketChannelFactory(
-                    Executors.newCachedThreadPool(tfb.setNameFormat(
-                            "BookKeeper-NIOBoss-%d").build()),
-                    Executors.newCachedThreadPool(tfb.setNameFormat(
-                            "BookKeeper-NIOWorker-%d").build()));
-            this.ownChannelFactory = true;
+        // initialize event loop group
+        if (null == eventLoopGroup) {
+            this.eventLoopGroup = getDefaultEventLoopGroup();
+            this.ownEventLoopGroup = true;
         } else {
-            this.channelFactory = channelFactory;
-            this.ownChannelFactory = false;
+            this.eventLoopGroup = eventLoopGroup;
+            this.ownEventLoopGroup = false;
         }
 
         if (null == requestTimer) {
@@ -365,7 +364,7 @@ public class BookKeeper implements AutoCloseable {
                 .build();
 
         // initialize bookie client
-        this.bookieClient = new BookieClient(conf, this.channelFactory, 
this.mainWorkerPool, statsLogger);
+        this.bookieClient = new BookieClient(conf, this.eventLoopGroup, 
this.mainWorkerPool, statsLogger);
         this.bookieWatcher = new BookieWatcher(conf, this.scheduler, 
this.placementPolicy, this);
         if (conf.getDiskWeightBasedPlacementEnabled()) {
             LOG.info("Weighted ledger placement enabled");
@@ -1178,8 +1177,8 @@ public class BookKeeper implements AutoCloseable {
         if (ownTimer) {
             requestTimer.stop();
         }
-        if (ownChannelFactory) {
-            channelFactory.releaseExternalResources();
+        if (ownEventLoopGroup) {
+            eventLoopGroup.shutdownGracefully();
         }
         if (ownZKHandle) {
             zk.close();
@@ -1255,4 +1254,20 @@ public class BookKeeper implements AutoCloseable {
     OpStatsLogger getAddOpLogger() { return addOpLogger; }
     OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
     OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
+
+    static EventLoopGroup getDefaultEventLoopGroup() {
+        ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat("bookkeeper-io-%s").build();
+        final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
+
+        if (SystemUtils.IS_OS_LINUX) {
+            try {
+                return new EpollEventLoopGroup(numThreads, threadFactory);
+            } catch (Throwable t) {
+                LOG.warn("Could not use Netty Epoll event loop for bookie 
server: {}", t.getMessage());
+                return new NioEventLoopGroup(numThreads, threadFactory);
+            }
+        } else {
+            return new NioEventLoopGroup(numThreads, threadFactory);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
index 9194bf9..25a0f61 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
@@ -19,7 +19,8 @@ package org.apache.bookkeeper.client;
 */
 
 
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+
 import java.util.zip.CRC32;
 
 class CRC32DigestManager extends DigestManager {
@@ -40,16 +41,13 @@ class CRC32DigestManager extends DigestManager {
     }
 
     @Override
-    byte[] getValueAndReset() {
-        byte[] value = new byte[8];
-        ByteBuffer buf = ByteBuffer.wrap(value);
-        buf.putLong(crc.get().getValue());
+    void populateValueAndReset(ByteBuf buf) {
+        buf.writeLong(crc.get().getValue());
         crc.get().reset();
-        return value;
     }
 
     @Override
-    void update(byte[] data, int offset, int length) {
-        crc.get().update(data, offset, length);
+    void update(ByteBuf data) {
+        crc.get().update(data.nioBuffer());
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 2b13a29..7c3d46f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Optional;
 
+import io.netty.util.HashedWheelTimer;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
@@ -37,7 +39,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.collections.CollectionUtils;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
index c72f31a..396e6d9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
@@ -18,16 +18,18 @@ package org.apache.bookkeeper.client;
  * limitations under the License.
  */
 
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 import java.security.GeneralSecurityException;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.DoubleByteBuf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
 
 /**
  * This class takes an entry, attaches a digest to it and packages it with 
relevant
@@ -47,11 +49,12 @@ abstract class DigestManager {
     abstract int getMacCodeLength();
 
     void update(byte[] data) {
-        update(data, 0, data.length);
+        update(Unpooled.wrappedBuffer(data, 0, data.length));
     }
 
-    abstract void update(byte[] data, int offset, int length);
-    abstract byte[] getValueAndReset();
+    abstract void update(ByteBuf buffer);
+
+    abstract void populateValueAndReset(ByteBuf buffer);
 
     final int macCodeLength;
 
@@ -81,26 +84,21 @@ abstract class DigestManager {
      * @return
      */
 
-    public ChannelBuffer computeDigestAndPackageForSending(long entryId, long 
lastAddConfirmed, long length, byte[] data, int doffset, int dlength) {
-
-        byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength];
-        ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
-        buffer.putLong(ledgerId);
-        buffer.putLong(entryId);
-        buffer.putLong(lastAddConfirmed);
-        buffer.putLong(length);
-        buffer.flip();
+    public ByteBuf computeDigestAndPackageForSending(long entryId, long 
lastAddConfirmed, long length, byte[] data,
+            int doffset, int dlength) {
+        ByteBuf headersBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+        headersBuffer.writeLong(ledgerId);
+        headersBuffer.writeLong(entryId);
+        headersBuffer.writeLong(lastAddConfirmed);
+        headersBuffer.writeLong(length);
 
-        update(buffer.array(), 0, METADATA_LENGTH);
-        update(data, doffset, dlength);
-        byte[] digest = getValueAndReset();
+        ByteBuf dataBuffer = Unpooled.wrappedBuffer(data, doffset, dlength);
 
-        buffer.limit(buffer.capacity());
-        buffer.position(METADATA_LENGTH);
-        buffer.put(digest);
-        buffer.flip();
+        update(headersBuffer);
+        update(dataBuffer);
+        populateValueAndReset(headersBuffer);
 
-        return 
ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), 
ChannelBuffers.wrappedBuffer(data, doffset, dlength));
+        return DoubleByteBuf.get(headersBuffer, dataBuffer);
     }
 
     /**
@@ -110,39 +108,28 @@ abstract class DigestManager {
      * @return
      */
 
-    public ChannelBuffer computeDigestAndPackageForSendingLac(long lac) {
+    public ByteBuf computeDigestAndPackageForSendingLac(long lac) {
+        ByteBuf headersBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+        headersBuffer.writeLong(ledgerId);
+        headersBuffer.writeLong(lac);
 
-        byte[] bufferArray = new byte[LAC_METADATA_LENGTH + macCodeLength];
-        ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
-        buffer.putLong(ledgerId);
-        buffer.putLong(lac);
-        buffer.flip();
+        update(headersBuffer);
+        populateValueAndReset(headersBuffer);
 
-        update(buffer.array(), 0, LAC_METADATA_LENGTH);
-        byte[] digest = getValueAndReset();
-
-        buffer.limit(buffer.capacity());
-        buffer.position(LAC_METADATA_LENGTH);
-        buffer.put(digest);
-        buffer.flip();
-
-        return 
ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer));
+        return headersBuffer;
     }
 
-    private void verifyDigest(ChannelBuffer dataReceived) throws 
BKDigestMatchException {
+    private void verifyDigest(ByteBuf dataReceived) throws 
BKDigestMatchException {
         verifyDigest(LedgerHandle.INVALID_ENTRY_ID, dataReceived, true);
     }
 
-    private void verifyDigest(long entryId, ChannelBuffer dataReceived) throws 
BKDigestMatchException {
+    private void verifyDigest(long entryId, ByteBuf dataReceived) throws 
BKDigestMatchException {
         verifyDigest(entryId, dataReceived, false);
     }
 
-    private void verifyDigest(long entryId, ChannelBuffer dataReceived, 
boolean skipEntryIdCheck)
+    private void verifyDigest(long entryId, ByteBuf dataReceived, boolean 
skipEntryIdCheck)
             throws BKDigestMatchException {
 
-        ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
-        byte[] digest;
-
         if ((METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) {
             logger.error("Data received is smaller than the minimum for this 
digest type. "
                     + " Either the packet it corrupt, or the wrong digest is 
configured. "
@@ -150,17 +137,21 @@ abstract class DigestManager {
                     this.getClass().getName(), dataReceived.readableBytes());
             throw new BKDigestMatchException();
         }
-        update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), 
METADATA_LENGTH);
+        update(dataReceived.slice(0, METADATA_LENGTH));
 
         int offset = METADATA_LENGTH + macCodeLength;
-        update(dataReceivedBuffer.array(), dataReceivedBuffer.position() + 
offset, dataReceived.readableBytes() - offset);
-        digest = getValueAndReset();
+        update(dataReceived.slice(offset, dataReceived.readableBytes() - 
offset));
 
-        for (int i = 0; i < digest.length; i++) {
-            if (digest[i] != dataReceived.getByte(METADATA_LENGTH + i)) {
+        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        populateValueAndReset(digest);
+
+        try {
+            if (digest.compareTo(dataReceived.slice(METADATA_LENGTH, 
macCodeLength)) != 0) {
                 logger.error("Mac mismatch for ledger-id: " + ledgerId + ", 
entry-id: " + entryId);
                 throw new BKDigestMatchException();
             }
+        } finally {
+            digest.release();
         }
 
         long actualLedgerId = dataReceived.readLong();
@@ -180,9 +171,7 @@ abstract class DigestManager {
 
     }
 
-    long verifyDigestAndReturnLac(ChannelBuffer dataReceived) throws 
BKDigestMatchException{
-        ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
-        byte[] digest;
+    long verifyDigestAndReturnLac(ByteBuf dataReceived) throws 
BKDigestMatchException{
         if ((LAC_METADATA_LENGTH + macCodeLength) > 
dataReceived.readableBytes()) {
             logger.error("Data received is smaller than the minimum for this 
digest type."
                     + " Either the packet it corrupt, or the wrong digest is 
configured. "
@@ -190,14 +179,21 @@ abstract class DigestManager {
                     this.getClass().getName(), dataReceived.readableBytes());
             throw new BKDigestMatchException();
         }
-        update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), 
LAC_METADATA_LENGTH);
-        digest = getValueAndReset();
-        for (int i = 0; i < digest.length; i++) {
-            if (digest[i] != dataReceived.getByte(LAC_METADATA_LENGTH + i)) {
+
+        update(dataReceived.slice(0, LAC_METADATA_LENGTH));
+
+        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        try {
+            populateValueAndReset(digest);
+
+            if (digest.compareTo(dataReceived.slice(LAC_METADATA_LENGTH, 
macCodeLength)) != 0) {
                 logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
                 throw new BKDigestMatchException();
             }
+        } finally {
+            digest.release();
         }
+
         long actualLedgerId = dataReceived.readLong();
         long lac = dataReceived.readLong();
         if (actualLedgerId != ledgerId) {
@@ -216,11 +212,11 @@ abstract class DigestManager {
      * @return
      * @throws BKDigestMatchException
      */
-    ChannelBufferInputStream verifyDigestAndReturnData(long entryId, 
ChannelBuffer dataReceived)
+    ByteBufInputStream verifyDigestAndReturnData(long entryId, ByteBuf 
dataReceived)
             throws BKDigestMatchException {
         verifyDigest(entryId, dataReceived);
         dataReceived.readerIndex(METADATA_LENGTH + macCodeLength);
-        return new ChannelBufferInputStream(dataReceived);
+        return new ByteBufInputStream(dataReceived);
     }
 
     static class RecoveryData {
@@ -234,7 +230,7 @@ abstract class DigestManager {
 
     }
 
-    RecoveryData verifyDigestAndReturnLastConfirmed(ChannelBuffer 
dataReceived) throws BKDigestMatchException {
+    RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) 
throws BKDigestMatchException {
         verifyDigest(dataReceived);
         dataReceived.readerIndex(8);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index d2e16e8..af49fa1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -25,6 +25,8 @@ import java.util.Set;
 
 import com.google.common.base.Optional;
 
+import io.netty.util.HashedWheelTimer;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -32,7 +34,6 @@ import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
 
 /**
  * Encapsulation of the algorithm that selects a number of bookies from the 
cluster as an ensemble for storing

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index e452b24..d4a12e6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -26,10 +26,11 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.LedgerHandle.LastAddConfirmedCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 interface ExplicitLacFlushPolicy {
     void stopExplicitLacFlush();
 
@@ -129,8 +130,7 @@ interface ExplicitLacFlushPolicy {
                 lh.bk.mainWorkerPool.submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        ChannelBuffer toSend = lh.macManager
-                                
.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
+                        ByteBuf toSend = 
lh.macManager.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
                         op.initiate(toSend);
                     }
                 });

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 3f2580f..4266c90 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -19,6 +19,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
@@ -31,7 +33,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,7 @@ public class LedgerChecker {
         }
 
         public void readEntryComplete(int rc, long ledgerId, long entryId,
-                ChannelBuffer buffer, Object ctx) {
+                ByteBuf buffer, Object ctx) {
             if (rc == BKException.Code.OK) {
                 if (numEntries.decrementAndGet() == 0
                         && !completed.getAndSet(true)) {
@@ -126,7 +127,7 @@ public class LedgerChecker {
         }
 
         public void readEntryComplete(int rc, long ledgerId, long entryId,
-                                      ChannelBuffer buffer, Object ctx) {
+                                      ByteBuf buffer, Object ctx) {
             if (BKException.Code.NoSuchEntryException != rc &&
                 BKException.Code.NoSuchLedgerExistsException != rc) {
                 entryMayExist.set(true);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 91f897c..6502e05 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -21,12 +21,13 @@ package org.apache.bookkeeper.client;
  *
  */
 
+import io.netty.buffer.ByteBufInputStream;
+
 import java.io.IOException;
 import java.io.InputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
 
 /**
  * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
@@ -40,7 +41,7 @@ public class LedgerEntry {
     long ledgerId;
     long entryId;
     long length;
-    ChannelBufferInputStream entryDataStream;
+    ByteBufInputStream entryDataStream;
 
     LedgerEntry(long lId, long eId) {
         this.ledgerId = lId;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 22241e6..0522d50 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -19,6 +19,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -40,8 +42,6 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException.Code;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,7 +281,7 @@ public class LedgerFragmentReplicator {
                 final long dataLength = data.length;
                 numEntriesRead.inc();
                 numBytesRead.registerSuccessfulValue(dataLength);
-                ChannelBuffer toSend = lh.getDigestManager()
+                ByteBuf toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
                                 data, 0, data.length);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index d1e5540..4ae4f48 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
+import io.netty.buffer.ByteBuf;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -51,7 +52,6 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,7 +110,7 @@ public class LedgerHandle implements AutoCloseable {
 
         this.ledgerId = ledgerId;
 
-        if (bk.getConf().getThrottleValue() > 0) { 
+        if (bk.getConf().getThrottleValue() > 0) {
             this.throttler = 
RateLimiter.create(bk.getConf().getThrottleValue());
         } else {
             this.throttler = null;
@@ -285,7 +285,7 @@ public class LedgerHandle implements AutoCloseable {
         asyncClose(new SyncCloseCallback(), counter);
 
         explicitLacFlushPolicy.stopExplicitLacFlush();
-        
+
         SynchCallbackUtils.waitForResult(counter);
     }
 
@@ -811,12 +811,16 @@ public class LedgerHandle implements AutoCloseable {
         }
 
         try {
-            bk.mainWorkerPool.submit(new SafeRunnable() {
+            bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    ChannelBuffer toSend = 
macManager.computeDigestAndPackageForSending(
-                                               entryId, lastAddConfirmed, 
currentLength, data, offset, length);
-                    op.initiate(toSend, length);
+                    ByteBuf toSend = 
macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
+                            currentLength, data, offset, length);
+                    try {
+                        op.initiate(toSend, length);
+                    } finally {
+                        toSend.release();
+                    }
                 }
                 @Override
                 public String toString() {
@@ -1024,7 +1028,7 @@ public class LedgerHandle implements AutoCloseable {
      * returns the value of the last add confirmed from the metadata.
      *
      * @see #getLastAddConfirmed()
-     * 
+     *
      * @param cb
      *          callback to return read explicit last confirmed
      * @param ctx
@@ -1049,7 +1053,7 @@ public class LedgerHandle implements AutoCloseable {
             @Override
             public void getLacComplete(int rc, long lac) {
                 if (rc == BKException.Code.OK) {
-                    // here we are trying to update lac only but not length 
+                    // here we are trying to update lac only but not length
                     updateLastConfirmed(lac, 0);
                     cb.readLastConfirmedComplete(rc, lac, ctx);
                 } else {
@@ -1133,6 +1137,7 @@ public class LedgerHandle implements AutoCloseable {
         while ((pendingAddOp = pendingAddOps.peek()) != null
                && blockAddCompletions.get() == 0) {
             if (!pendingAddOp.completed) {
+                LOG.debug("pending add not completed: {}", pendingAddOp);
                 return;
             }
             // Check if it is the next entry in the sequence.
@@ -1183,9 +1188,9 @@ public class LedgerHandle implements AutoCloseable {
 
     void handleBookieFailure(final BookieSocketAddress addr, final int 
bookieIndex) {
         // If this is the first failure,
-        // try to submit completed pendingAddOps before this failure. 
+        // try to submit completed pendingAddOps before this failure.
         if (0 == blockAddCompletions.get()) {
-            sendAddSuccessCallbacks(); 
+            sendAddSuccessCallbacks();
         }
 
         blockAddCompletions.incrementAndGet();
@@ -1197,9 +1202,9 @@ public class LedgerHandle implements AutoCloseable {
                          addr, bookieIndex);
                 blockAddCompletions.decrementAndGet();
 
-                // Try to submit completed pendingAddOps, pending by this fix. 
+                // Try to submit completed pendingAddOps, pending by this fix.
                 if (0 == blockAddCompletions.get()) {
-                    sendAddSuccessCallbacks(); 
+                    sendAddSuccessCallbacks();
                 }
 
                 return;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 4cdcdca..ffc469e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -32,10 +32,11 @@ import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add 
entries with
  * user supplied entryIds. Through this interface Ledger Length may not be 
accurate wile the
@@ -213,9 +214,10 @@ public class LedgerHandleAdv extends LedgerHandle {
             bk.mainWorkerPool.submit(new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    ChannelBuffer toSend = 
macManager.computeDigestAndPackageForSending(
+                    ByteBuf toSend = 
macManager.computeDigestAndPackageForSending(
                                                op.getEntryId(), 
lastAddConfirmed, currentLength, data, offset, length);
                     op.initiate(toSend, length);
+                    toSend.release();
                 }
             });
         } catch (RejectedExecutionException e) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
index df09d52..920143f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
@@ -18,6 +18,8 @@ package org.apache.bookkeeper.client;
 * limitations under the License.
 */
 
+import io.netty.buffer.ByteBuf;
+
 import java.security.GeneralSecurityException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -73,13 +75,13 @@ class MacDigestManager extends DigestManager {
 
 
     @Override
-    byte[] getValueAndReset() {
-        return mac.get().doFinal();
+    void populateValueAndReset(ByteBuf buffer) {
+        buffer.writeBytes(mac.get().doFinal());
     }
 
     @Override
-    void update(byte[] data, int offset, int length) {
-        mac.get().update(data, offset, length);
+    void update(ByteBuf data) {
+        mac.get().update(data.nioBuffer());
     }
 
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 1946069..9407189 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -17,6 +17,11 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -28,9 +33,6 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.RejectedExecutionException;
@@ -48,7 +50,7 @@ import java.util.concurrent.TimeUnit;
 class PendingAddOp implements WriteCallback, TimerTask {
     private final static Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
-    ChannelBuffer toSend;
+    ByteBuf toSend;
     AddCallback cb;
     Object ctx;
     long entryId;
@@ -66,6 +68,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
     Timeout timeout = null;
 
     OpStatsLogger addOpLogger;
+    boolean callbackTriggered = false;
 
     PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) {
         this.lh = lh;
@@ -153,6 +156,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
             return;
         }
 
+        if (callbackTriggered) {
+            return;
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " 
entry: " + entryId + " bookie index: "
                       + bookieIndex);
@@ -167,12 +174,20 @@ class PendingAddOp implements WriteCallback, TimerTask {
         sendWriteRequest(bookieIndex);
     }
 
-    void initiate(ChannelBuffer toSend, int entryLength) {
+    void initiate(ByteBuf toSend, int entryLength) {
+        if (callbackTriggered) {
+            // this should only be true if the request was failed due to 
another request ahead in the pending queue,
+            // so we can just ignore this request
+            return;
+        }
+
         if (timeoutSec > -1) {
             this.timeout = lh.bk.bookieClient.scheduleTimeout(this, 
timeoutSec, TimeUnit.SECONDS);
         }
         this.requestTimeNanos = MathUtils.nowInNano();
         this.toSend = toSend;
+        // Retain the buffer until all writes are complete
+        this.toSend.retain();
         this.entryLength = entryLength;
         for (int bookieIndex : writeSet) {
             sendWriteRequest(bookieIndex);
@@ -233,6 +248,14 @@ class PendingAddOp implements WriteCallback, TimerTask {
         if (null != timeout) {
             timeout.cancel();
         }
+
+
+        ReferenceCountUtil.release(toSend);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", new Object[] 
{ lh.getId(), entryId, rc });
+        }
+
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (rc != BKException.Code.OK) {
             addOpLogger.registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
@@ -242,6 +265,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
             addOpLogger.registerSuccessfulEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
         }
         cb.addComplete(rc, lh, entryId, ctx);
+        callbackTriggered = true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 64e266f..5b48461 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -19,13 +19,11 @@ package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
+
+import io.netty.buffer.ByteBuf;
 
 /**
  * This represents a pending ReadLac operation.
@@ -74,7 +72,8 @@ class PendingReadLacOp implements ReadLacCallback {
     }
 
     @Override
-    public void readLacComplete(int rc, long ledgerId, final ChannelBuffer 
lacBuffer, final ChannelBuffer lastEntryBuffer, Object ctx) {
+    public void readLacComplete(int rc, long ledgerId, final ByteBuf 
lacBuffer, final ByteBuf lastEntryBuffer,
+            Object ctx) {
         int bookieIndex = (Integer) ctx;
         numResponsesPending--;
         boolean heardValidResponse = false;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index cafe8f7..57a84f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -20,6 +20,9 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
@@ -41,8 +44,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -221,8 +222,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
         // return true if we managed to complete the entry
         // return false if the read entry is not complete or it is already 
completed before
-        boolean complete(BookieSocketAddress host, final ChannelBuffer buffer) 
{
-            ChannelBufferInputStream is;
+        boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
+            ByteBufInputStream is;
             try {
                 is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKDigestMatchException e) {
@@ -352,7 +353,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
     }
 
     @Override
-    public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ChannelBuffer buffer, Object ctx) {
+    public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ByteBuf buffer, Object ctx) {
         final ReadContext rctx = (ReadContext)ctx;
         final LedgerEntryRequest entry = rctx.entry;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index dc7368b..45c3898 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -25,10 +25,11 @@ import 
org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * This represents a pending WriteLac operation. When it has got
  * success from Ack Quorum bookies, sends success back to the application,
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
  */
 class PendingWriteLacOp implements WriteLacCallback {
     private final static Logger LOG = 
LoggerFactory.getLogger(PendingWriteLacOp.class);
-    ChannelBuffer toSend;
+    ByteBuf toSend;
     AddLacCallback cb;
     long lac;
     Object ctx;
@@ -74,7 +75,7 @@ class PendingWriteLacOp implements WriteLacCallback {
                 lac, toSend, this, bookieIndex);
     }
 
-    void initiate(ChannelBuffer toSend) {
+    void initiate(ByteBuf toSend) {
         this.toSend = toSend;
         for (int bookieIndex: writeSet) {
             sendWriteLacRequest(bookieIndex);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 7272447..a72e2ca 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -26,7 +26,8 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
+
+import io.netty.util.HashedWheelTimer;
 
 public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacementPolicyImpl
         implements 
ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode>
 {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 8d56f7a..b5fdfed 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -48,7 +48,6 @@ import org.apache.bookkeeper.net.StabilizeNetworkTopology;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.collections.CollectionUtils;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +55,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import io.netty.util.HashedWheelTimer;
+
 /**
  * Simple rackware ensemble placement policy.
  *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index af21f44..e95a527 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -17,13 +17,15 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
+
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
  * This class encapsulated the read last confirmed operation.
@@ -75,7 +77,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
     }
 
     public synchronized void readEntryComplete(final int rc, final long 
ledgerId, final long entryId,
-            final ChannelBuffer buffer, final Object ctx) {
+            final ByteBuf buffer, final Object ctx) {
         int bookieIndex = (Integer) ctx;
 
         numResponsesPending--;
@@ -96,6 +98,8 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
             }
         }
 
+        ReferenceCountUtil.release(buffer);
+
         if (rc == BKException.Code.NoSuchLedgerExistsException || rc == 
BKException.Code.NoSuchEntryException) {
             // this still counts as a valid response, e.g., if the client 
crashed without writing any entry
             heardValidResponse = true;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index ed9985f..787a5e3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.base.Optional;
 
+import io.netty.util.HashedWheelTimer;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.Feature;
@@ -42,7 +43,6 @@ import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 01b81c9..c896e93 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -21,10 +21,11 @@ import 
org.apache.bookkeeper.client.DigestManager.RecoveryData;
 import 
org.apache.bookkeeper.client.ReadLastConfirmedOp.LastConfirmedDataCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 /**
@@ -60,7 +61,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
     }
 
     @Override
-    public void readEntryComplete(int rc, long ledgerId, long entryId, 
ChannelBuffer buffer, Object ctx) {
+    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf 
buffer, Object ctx) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("TryReadLastConfirmed received response for (lid={}, 
eid={}) : {}",
                     new Object[] { ledgerId, entryId, rc });

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 6b913d4..ad025f2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -51,6 +51,7 @@ public class ClientConfiguration extends 
AbstractConfiguration {
 
     // NIO Parameters
     protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
+    protected final static String CLIENT_SOCK_KEEPALIVE = 
"clientSockKeepalive";
     protected final static String CLIENT_SENDBUFFER_SIZE = 
"clientSendBufferSize";
     protected final static String CLIENT_RECEIVEBUFFER_SIZE = 
"clientReceiveBufferSize";
     protected final static String CLIENT_WRITEBUFFER_LOW_WATER_MARK = 
"clientWriteBufferLowWaterMark";
@@ -267,6 +268,29 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * get socket keepalive
+     * 
+     * @return socket keepalive setting
+     */
+    public boolean getClientSockKeepalive() {
+        return getBoolean(CLIENT_SOCK_KEEPALIVE, true);
+    }
+
+    /**
+     * Set socket keepalive setting.
+     * 
+     * This setting is used to send keep-alive messages on connection-oriented 
sockets.
+     * 
+     * @param keepalive
+     *            KeepAlive setting
+     * @return client configuration
+     */
+    public ClientConfiguration setClientSockKeepalive(boolean keepalive) {
+        setProperty(CLIENT_SOCK_KEEPALIVE, Boolean.toString(keepalive));
+        return this;
+    }
+
+    /**
      * Get client netty channel send buffer size.
      *
      * @return client netty channel send buffer size

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 2f8bb9a..ab8bbaa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.Beta;
 
-import com.google.common.collect.Lists;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -90,6 +89,8 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     protected final static String ALLOW_STORAGE_EXPANSION = 
"allowStorageExpansion";
     // NIO Parameters
     protected final static String SERVER_TCP_NODELAY = "serverTcpNoDelay";
+    protected final static String SERVER_SOCK_KEEPALIVE = 
"serverSockKeepalive";
+    protected final static String SERVER_SOCK_LINGER = "serverTcpLinger";
 
     // Zookeeper Parameters
     protected final static String ZK_TIMEOUT = "zkTimeout";
@@ -133,6 +134,11 @@ public class ServerConfiguration extends 
AbstractConfiguration {
 
     protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass";
 
+    // Rx adaptive ByteBuf allocator parameters
+    protected final static String BYTEBUF_ALLOCATOR_SIZE_INITIAL = 
"byteBufAllocatorSizeInitial";
+    protected final static String BYTEBUF_ALLOCATOR_SIZE_MIN = 
"byteBufAllocatorSizeMin";
+    protected final static String BYTEBUF_ALLOCATOR_SIZE_MAX = 
"byteBufAllocatorSizeMax";
+
     // Bookie auth provider factory class name
     protected final static String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = 
"bookieAuthProviderFactoryClass";
 
@@ -741,6 +747,54 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Timeout to drain the socket on close.
+     *
+     * @return socket linger setting
+     */
+    public int getServerSockLinger() {
+        return getInt(SERVER_SOCK_LINGER, 0);
+    }
+
+    /**
+     * Set socket linger timeout on close.
+     * 
+     * When enabled, a close or shutdown will not return until all queued 
messages for the socket have been successfully
+     * sent or the linger timeout has been reached. Otherwise, the call 
returns immediately and the closing is done in
+     * the background.
+     *
+     * @param noDelay
+     *            NoDelay setting
+     * @return server configuration
+     */
+    public ServerConfiguration setServerSockLinger(int linger) {
+        setProperty(SERVER_SOCK_LINGER, Integer.toString(linger));
+        return this;
+    }
+
+    /**
+     * get socket keepalive
+     * 
+     * @return socket keepalive setting
+     */
+    public boolean getServerSockKeepalive() {
+        return getBoolean(SERVER_SOCK_KEEPALIVE, true);
+    }
+
+    /**
+     * Set socket keepalive setting.
+     * 
+     * This setting is used to send keep-alive messages on connection-oriented 
sockets.
+     * 
+     * @param keepalive
+     *            KeepAlive setting
+     * @return server configuration
+     */
+    public ServerConfiguration setServerSockKeepalive(boolean keepalive) {
+        setProperty(SERVER_SOCK_KEEPALIVE, Boolean.toString(keepalive));
+        return this;
+    }
+
+    /**
      * Get zookeeper servers to connect
      *
      * @return zookeeper servers
@@ -1749,6 +1803,63 @@ public class ServerConfiguration extends 
AbstractConfiguration {
         }
     }
 
+    /**
+     * Get Recv ByteBuf allocator initial buf size
+     * 
+     * @return initial byteBuf size
+     */
+    public int getRecvByteBufAllocatorSizeInitial() {
+        return getInt(BYTEBUF_ALLOCATOR_SIZE_INITIAL, 64 * 1024);
+    }
+
+    /**
+     * Set Recv ByteBuf allocator initial buf size
+     * 
+     * @param size
+     *            buffer size
+     */
+    public void setRecvByteBufAllocatorSizeInitial(int size) {
+        setProperty(BYTEBUF_ALLOCATOR_SIZE_INITIAL, size);
+    }
+
+    /**
+     * Get Recv ByteBuf allocator min buf size
+     * 
+     * @return min byteBuf size
+     */
+    public int getRecvByteBufAllocatorSizeMin() {
+        return getInt(BYTEBUF_ALLOCATOR_SIZE_MIN, 64 * 1024);
+    }
+
+    /**
+     * Set Recv ByteBuf allocator min buf size
+     * 
+     * @param size
+     *            buffer size
+     */
+    public void setRecvByteBufAllocatorSizeMin(int size) {
+        setProperty(BYTEBUF_ALLOCATOR_SIZE_MIN, size);
+    }
+
+    /**
+     * Get Recv ByteBuf allocator max buf size
+     * 
+     * @return max byteBuf size
+     */
+    public int getRecvByteBufAllocatorSizeMax() {
+        return getInt(BYTEBUF_ALLOCATOR_SIZE_MAX, 1 * 1024 * 1024);
+    }
+
+    /**
+     * Set Recv ByteBuf allocator max buf size
+     * 
+     * @param size
+     *            buffer size
+     */
+    public void setRecvByteBufAllocatorSizeMax(int size) {
+        setProperty(BYTEBUF_ALLOCATOR_SIZE_MAX, size);
+    }
+
     /*
      * Set the bookie authentication provider factory class name.
      * If this is not set, no authentication will be used

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
index 8947abf..382c221 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
@@ -23,8 +23,9 @@ package org.apache.bookkeeper.net;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
+import io.netty.channel.local.LocalAddress;
+
 import static org.apache.bookkeeper.util.BookKeeperConstants.COLON;
-import org.jboss.netty.channel.local.LocalAddress;
 
 /**
  * This is a data wrapper class that is an InetSocketAddress, it would use the 
hostname

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
index 5dce906..e438634 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
@@ -18,12 +18,13 @@
 package org.apache.bookkeeper.net;
 
 import org.apache.bookkeeper.util.MathUtils;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74f79513/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index d785d29..658753c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -20,7 +20,7 @@
  */
 package org.apache.bookkeeper.processor;
 
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 public interface RequestProcessor {
 

Reply via email to