This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 57dda5d  KUDU-1438: [java] Upgrade to Netty 4
57dda5d is described below

commit 57dda5d4868d29f68de4aa0ac516ca390333e6be
Author: Grant Henke <[email protected]>
AuthorDate: Wed Jan 22 14:24:03 2020 -0600

    KUDU-1438: [java] Upgrade to Netty 4
    
    This patch upgrades from Netty 3 to the latest Netty 4.
    Below is a list of some of the changes required:
    - Changed org.jboss.netty imports to io.netty
    - Adjust shading to relocate io.netty
    - Removed outdated reflection in Bytes.java
    - Changed ChannelBuffer to ByteBuffer
    - Changed OneToOneDecoder to BytesToMessageDecoder
    - Changed OneToOneEncoder to MessageToByteEncoder
    - Changed DecoderEmbedder to EmbeddedChannel
    - Changed Negotiator and Connection to extend
      SimpleChannelInboundHandler instead of
      SimpleChannelUpstreamHandler
    - Worked around non-sharable SslHandler
    - Migrated from NioClientSocketChannelFactory to Netty Boostrap
    - Removed boss threads and deprecated setter methods in the
      AsyncKuduClient
    - Removed ShutdownThread in AsyncKuduClient
    - Updated Negotiator tests to unwrap SSL messages
    - Added build flag to run tests with paranoid level leak detection
    - Suppressed errorprone FutureReturnValueIgnored warnings
    
    I used YCSB to performance test this change and ensure there
    isn’t a large performance regression. On an m4.2xlarge instance
    I ran a test of 5 iterations of workload a, b, and c using 1M records
    and 1M operations. The difference between runs was negligible
    (< 1%) and within test variance.
    
    Change-Id: Ic75bd15a982187039ff2e1510af9390d304f7626
    Reviewed-on: http://gerrit.cloudera.org:8080/15136
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
---
 java/gradle/dependencies.gradle                    |   4 +-
 java/gradle/shadow.gradle                          |   2 +-
 java/gradle/tests.gradle                           |   5 +
 .../org/apache/kudu/client/AlterTableRequest.java  |   2 +-
 .../org/apache/kudu/client/AsyncKuduClient.java    | 116 ++++++-----
 .../org/apache/kudu/client/AsyncKuduSession.java   |   4 +-
 .../main/java/org/apache/kudu/client/Batch.java    |   2 +-
 .../main/java/org/apache/kudu/client/Bytes.java    |  93 +--------
 .../java/org/apache/kudu/client/CallResponse.java  |  40 ++--
 .../org/apache/kudu/client/ConnectToCluster.java   |   2 +-
 .../apache/kudu/client/ConnectToMasterRequest.java |   2 +-
 .../java/org/apache/kudu/client/Connection.java    | 198 +++++++++----------
 .../org/apache/kudu/client/ConnectionCache.java    |  19 +-
 .../org/apache/kudu/client/CreateTableRequest.java |   2 +-
 .../org/apache/kudu/client/DeleteTableRequest.java |   2 +-
 .../kudu/client/GetTableLocationsRequest.java      |   2 +-
 .../apache/kudu/client/GetTableSchemaRequest.java  |   2 +-
 .../kudu/client/GetTableStatisticsRequest.java     |   2 +-
 .../kudu/client/IsAlterTableDoneRequest.java       |   2 +-
 .../kudu/client/IsCreateTableDoneRequest.java      |   2 +-
 .../java/org/apache/kudu/client/KuduClient.java    |  36 +++-
 .../main/java/org/apache/kudu/client/KuduRpc.java  |  38 ++--
 .../org/apache/kudu/client/ListTablesRequest.java  |   2 +-
 .../kudu/client/ListTabletServersRequest.java      |   2 +-
 .../org/apache/kudu/client/ListTabletsRequest.java |   2 +-
 .../java/org/apache/kudu/client/Negotiator.java    | 218 ++++++++++++---------
 .../java/org/apache/kudu/client/Operation.java     |   2 +-
 .../java/org/apache/kudu/client/PingRequest.java   |   2 +-
 .../org/apache/kudu/client/RpcOutboundMessage.java |  24 +--
 .../apache/kudu/client/SplitKeyRangeRequest.java   |   2 +-
 .../main/java/org/apache/kudu/util/NetUtil.java    |   1 -
 .../test/java/org/apache/kudu/client/ITClient.java |   1 +
 .../apache/kudu/client/ITScannerMultiTablet.java   |   1 +
 .../apache/kudu/client/TestAsyncKuduClient.java    |   1 +
 .../apache/kudu/client/TestAuthTokenReacquire.java |   3 +-
 .../kudu/client/TestAuthnTokenReacquireOpen.java   |   1 +
 .../apache/kudu/client/TestConnectToCluster.java   |   4 +-
 .../apache/kudu/client/TestConnectionCache.java    |   1 +
 .../org/apache/kudu/client/TestNegotiator.java     | 142 +++++++++-----
 39 files changed, 501 insertions(+), 485 deletions(-)

diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 67b42a7..3eaf428 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -47,7 +47,7 @@ versions += [
     log4j          : "2.11.2",
     mockito        : "3.1.0",
     murmur         : "1.0.0",
-    netty          : "3.10.6.Final",
+    netty          : "4.1.44.Final",
     osdetector     : "1.6.2",
     parquet        : "1.10.1",
     protobuf       : "3.10.0",
@@ -100,7 +100,7 @@ libs += [
     log4jSlf4jImpl       : 
"org.apache.logging.log4j:log4j-slf4j-impl:$versions.log4j",
     mockitoCore          : "org.mockito:mockito-core:$versions.mockito",
     murmur               : "com.sangupta:murmur:$versions.murmur",
-    netty                : "io.netty:netty:$versions.netty",
+    netty                : "io.netty:netty-all:$versions.netty",
     osdetector           : 
"com.google.gradle:osdetector-gradle-plugin:$versions.osdetector",
     parquetHadoop        : 
"org.apache.parquet:parquet-hadoop:$versions.parquet",
     protobufJava         : 
"com.google.protobuf:protobuf-java:$versions.protobuf",
diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle
index bb414e5..c09b06f 100644
--- a/java/gradle/shadow.gradle
+++ b/java/gradle/shadow.gradle
@@ -58,7 +58,7 @@ shadowJar {
   relocate "org.checkerframework", 
"org.apache.kudu.shaded.org.checkerframework"
   relocate "org.hamcrest", "org.apache.kudu.shaded.org.hamcrest"
   relocate "org.HdrHistogram", "org.apache.kudu.shaded.org.HdrHistogram"
-  relocate "org.jboss.netty", "org.apache.kudu.shaded.org.jboss.netty"
+  relocate "io.netty", "org.apache.kudu.shaded.io.netty"
   relocate "scopt", "org.apache.kudu.shaded.scopt"
 }
 
diff --git a/java/gradle/tests.gradle b/java/gradle/tests.gradle
index 94a0d49..eaf083b 100644
--- a/java/gradle/tests.gradle
+++ b/java/gradle/tests.gradle
@@ -56,6 +56,11 @@ tasks.withType(Test) {
       jvmArgs += "--add-opens=$module=ALL-UNNAMED"
     }
   }
+  // Enable paranoid Netty leak detection during tests.
+  // https://netty.io/wiki/reference-counted-objects.html#leak-detection-levels
+  if (propertyExists("nettyLeakDetection")) {
+    jvmArgs += "-Dio.netty.leakDetection.level=paranoid"
+  }
 
   // Set a few system properties.
   systemProperty "java.awt.headless", true
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index 0ebb6ba..c323c95 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -27,8 +27,8 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index e41e6c9..4b40a39 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -56,15 +57,18 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -282,7 +286,7 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   static int FETCH_TABLETS_PER_RANGE_LOOKUP = 1000;
 
-  private final ClientSocketChannelFactory channelFactory;
+  private final Bootstrap bootstrap;
 
   /**
    * This map contains data cached from calls to the master's
@@ -365,7 +369,7 @@ public class AsyncKuduClient implements AutoCloseable {
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
-    this.channelFactory = b.createChannelFactory();
+    this.bootstrap = b.createBootstrap();
     this.masterAddresses = b.masterAddresses;
     this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
         MASTER_TABLE_NAME_PLACEHOLDER, null, null, 1, null);
@@ -377,8 +381,7 @@ public class AsyncKuduClient implements AutoCloseable {
     this.requestTracker = new 
RequestTracker(UUID.randomUUID().toString().replace("-", ""));
 
     this.securityContext = new SecurityContext();
-    this.connectionCache = new ConnectionCache(
-        securityContext, timer, channelFactory);
+    this.connectionCache = new ConnectionCache(securityContext, bootstrap);
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
     this.authzTokenCache = new AuthzTokenCache(this);
   }
@@ -2495,30 +2498,17 @@ public class AsyncKuduClient implements AutoCloseable {
   public Deferred<ArrayList<Void>> shutdown() {
     checkIsClosed();
     closed = true;
-    // This is part of step 3.  We need to execute this in its own thread
-    // because Netty gets stuck in an infinite loop if you try to shut it
-    // down from within a thread of its own thread pool.  They don't want
-    // to fix this so as a workaround we always shut Netty's thread pool
-    // down from another thread.
-    final class ShutdownThread extends Thread {
-      ShutdownThread() {
-        super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " 
shutdown");
-      }
-
-      @Override
-      public void run() {
-        // This terminates the Executor.
-        channelFactory.releaseExternalResources();
-      }
-    }
 
     // 3. Release all other resources.
     final class ReleaseResourcesCB implements Callback<ArrayList<Void>, 
ArrayList<Void>> {
       @Override
-      public ArrayList<Void> call(final ArrayList<Void> arg) {
+      @SuppressWarnings("FutureReturnValueIgnored")
+      public ArrayList<Void> call(final ArrayList<Void> arg) throws 
InterruptedException {
         LOG.debug("Releasing all remaining resources");
         timer.stop();
-        new ShutdownThread().start();
+        // AbstractEventExecutor sets a default `quietPeriod` of 2 seconds and 
a 15 second timeout.
+        // We disable to quiet period to prevent resource leaks due to clients 
running forever.
+        bootstrap.config().group().shutdownGracefully(0, 15, TimeUnit.SECONDS);
         return arg;
       }
 
@@ -2620,7 +2610,6 @@ public class AsyncKuduClient implements AutoCloseable {
   @InterfaceStability.Evolving
   public static final class AsyncKuduClientBuilder {
     private static final int DEFAULT_MASTER_PORT = 7051;
-    private static final int DEFAULT_BOSS_COUNT = 1;
     private static final int DEFAULT_WORKER_COUNT = 2 * 
Runtime.getRuntime().availableProcessors();
 
     private final List<HostAndPort> masterAddresses;
@@ -2629,9 +2618,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
     private final HashedWheelTimer timer =
         new HashedWheelTimer(new 
ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
-    private Executor bossExecutor;
     private Executor workerExecutor;
-    private int bossCount = DEFAULT_BOSS_COUNT;
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
 
@@ -2707,33 +2694,43 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
-     * Set the executors which will be used for the embedded Netty boss and 
workers.
+     * @deprecated the bossExecutor is no longer used and will have no effect 
if provided
+     */
+    @Deprecated
+    public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor 
workerExecutor) {
+      this.workerExecutor = workerExecutor;
+      return this;
+    }
+
+    /**
+     * Set the executor which will be used for the embedded Netty workers.
+     *
      * Optional.
-     * If not provided, uses a simple cached threadpool. If either argument is 
null,
-     * then such a thread pool will be used in place of that argument.
+     * If not provided, uses a simple cached threadpool. If workerExecutor is 
null,
+     * then such a thread pool will be used.
      * Note: executor's max thread number must be greater or equal to 
corresponding
      * worker count, or netty cannot start enough threads, and client will get 
stuck.
      * If not sure, please just use CachedThreadPool.
      */
-    public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor 
workerExecutor) {
-      this.bossExecutor = bossExecutor;
+    public AsyncKuduClientBuilder nioExecutor(Executor workerExecutor) {
       this.workerExecutor = workerExecutor;
       return this;
     }
 
     /**
-     * Set the maximum number of boss threads.
-     * Optional.
-     * If not provided, 1 is used.
+     * @deprecated the bossExecutor is no longer used and will have no effect 
if provided
      */
+    @Deprecated
     public AsyncKuduClientBuilder bossCount(int bossCount) {
-      Preconditions.checkArgument(bossCount > 0, "bossCount should be greater 
than 0");
-      this.bossCount = bossCount;
+      LOG.info("bossCount is deprecated");
       return this;
     }
 
     /**
-     * Set the maximum number of worker threads.
+     * Set the maximum number of Netty worker threads.
+     * A worker thread performs non-blocking read and write for one or more
+     * Netty Channels in a non-blocking mode.
+     *
      * Optional.
      * If not provided, (2 * the number of available processors) is used. If
      * this client instance will be used on a machine running many client
@@ -2748,31 +2745,30 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
-     * Creates the channel factory for Netty. The user can specify the 
executors, but
+     * Creates the client bootstrap for Netty. The user can specify the 
executor, but
      * if they don't, we'll use a simple thread pool.
      */
-    private NioClientSocketChannelFactory createChannelFactory() {
-      Executor boss = bossExecutor;
+    private Bootstrap createBootstrap() {
       Executor worker = workerExecutor;
-      if (boss == null || worker == null) {
-        Executor defaultExec = Executors.newCachedThreadPool(
+      if (worker == null) {
+        worker = Executors.newCachedThreadPool(
             new ThreadFactoryBuilder()
                 .setNameFormat("kudu-nio-%d")
                 .setDaemon(true)
                 .build());
-        if (boss == null) {
-          boss = defaultExec;
-        }
-        if (worker == null) {
-          worker = defaultExec;
-        }
       }
-      // Share the timer with the socket channel factory so that it does not
-      // create an internal timer with a non-daemon thread.
-      return new NioClientSocketChannelFactory(boss,
-                                               bossCount,
-                                               new NioWorkerPool(worker, 
workerCount),
-                                               timer);
+      EventLoopGroup workerGroup = new NioEventLoopGroup(workerCount, worker);
+      Bootstrap b = new Bootstrap();
+      b.group(workerGroup);
+      b.channel(NioSocketChannel.class);
+      b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000);
+      b.option(ChannelOption.TCP_NODELAY, true);
+      // Unfortunately there is no way to override the keep-alive timeout in
+      // Java since the JRE doesn't expose any way to call setsockopt() with
+      // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+      b.option(ChannelOption.SO_KEEPALIVE, true);
+      b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+      return b;
     }
 
     /**
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index b9da766..5456c37 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -37,10 +37,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 9e2c086..124a8ab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -27,8 +27,8 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
 import org.apache.kudu.client.Statistics.Statistic;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
index 8574abf..d67a440 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
@@ -30,22 +30,17 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Comparator;
 
 import com.google.common.io.BaseEncoding;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.kudu.util.DecimalUtil;
 import org.apache.kudu.util.Slice;
@@ -404,7 +399,7 @@ public final class Bytes {
    * @param buf The buffer to read from.
    * @return The integer read.
    */
-  static int readVarInt32(final ChannelBuffer buf) {
+  static int readVarInt32(final ByteBuf buf) {
     int result = buf.readByte();
     if (result >= 0) {
       return result;
@@ -984,26 +979,12 @@ public final class Bytes {
    * @param buf The (possibly {@code null}) buffer to pretty-print.
    * @return The buffer in a pretty-printed string.
    */
-  public static String pretty(final ChannelBuffer buf) {
+  public static String pretty(final ByteBuf buf) {
     if (buf == null) {
       return "null";
     }
-    byte[] array;
-    try {
-      if (buf.getClass() != ReplayingDecoderBuffer) {
-        array = buf.array();
-      } else if (RDB_buf != null) {  // Netty 3.5.1 and above.
-        array = ((ChannelBuffer) RDB_buf.invoke(buf)).array();
-      } else {  // Netty 3.5.0 and before.
-        final ChannelBuffer wrapped_buf = (ChannelBuffer) RDB_buffer.get(buf);
-        array = wrapped_buf.array();
-      }
-    } catch (UnsupportedOperationException e) {
-      return "(failed to extract content of buffer of type " +
-          buf.getClass().getName() + ')';
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new AssertionError("Should not happen: " + e);
-    }
+    byte[] array = new byte[buf.readableBytes()];
+    buf.getBytes(buf.readerIndex(), array);
     return pretty(array);
   }
 
@@ -1020,68 +1001,6 @@ public final class Bytes {
     return sb.toString();
   }
 
-  // Ugly stuff
-  // ----------
-  // Background: when using ReplayingDecoder (which makes it easy to deal with
-  // unframed RPC responses), the ChannelBuffer we manipulate is in fact a
-  // ReplayingDecoderBuffer, a package-private class that Netty uses.  This
-  // class, for some reason, throws UnsupportedOperationException on its
-  // array() method.  This method is unfortunately the only way to easily dump
-  // the contents of a ChannelBuffer, which is useful for debugging or logging
-  // unexpected buffers.  An issue (NETTY-346) has been filed to get access to
-  // the buffer, but the resolution was useless: instead of making the array()
-  // method work, a new internalBuffer() method was added on ReplayingDecoder,
-  // which would require that we keep a reference on the ReplayingDecoder all
-  // along in order to properly convert the buffer to a string.
-  // So we instead use ugly reflection to gain access to the underlying buffer
-  // while taking into account that the implementation of Netty has changed
-  // over time, so depending which version of Netty we're working with, we do
-  // a different hack.  Yes this is horrible, but it's for the greater good as
-  // this is what allows us to debug unexpected buffers when deserializing RPCs
-  // and what's more important than being able to debug unexpected stuff?
-  private static final Class<?> ReplayingDecoderBuffer;
-  private static final Field RDB_buffer;  // For Netty 3.5.0 and before.
-  private static final Method RDB_buf;    // For Netty 3.5.1 and above.
-
-  static {
-    try {
-      ReplayingDecoderBuffer = Class.forName("org.jboss.netty.handler.codec." +
-          "replay.ReplayingDecoderBuffer");
-      Field field = AccessController.doPrivileged(new 
PrivilegedAction<Field>() {
-        @Override
-        public Field run() {
-          try {
-            Field bufferField = 
ReplayingDecoderBuffer.getDeclaredField("buffer");
-            bufferField.setAccessible(true);
-            return bufferField;
-          } catch (NoSuchFieldException e) {
-            // Ignore.  Field has been removed in Netty 3.5.1.
-            return null;
-          }
-        }
-      });
-      RDB_buffer = field;
-      if (field != null) {  // Netty 3.5.0 or before.
-        RDB_buf = null;
-      } else {
-        RDB_buf = AccessController.doPrivileged(new PrivilegedAction<Method>() 
{
-          @Override
-          public Method run() {
-            try {
-              Method bufMethod = 
ReplayingDecoderBuffer.getDeclaredMethod("buf");
-              bufMethod.setAccessible(true);
-              return bufMethod;
-            } catch (NoSuchMethodException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        });
-      }
-    } catch (ReflectiveOperationException | RuntimeException e) {
-      throw new RuntimeException("static initializer failed", e);
-    }
-  }
-
   // ---------------------- //
   // Comparing byte arrays. //
   // ---------------------- //
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index a042622..41fa35d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -19,11 +19,11 @@ package org.apache.kudu.client;
 
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DefaultByteBufHolder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
 
 import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.util.Slice;
@@ -33,8 +33,8 @@ import org.apache.kudu.util.Slice;
  * access to sidecars and decoded protobufs from the message.
  */
 @InterfaceAudience.Private
-final class CallResponse {
-  private final ChannelBuffer buf;
+final class CallResponse extends DefaultByteBufHolder {
+  private final ByteBuf buf;
   private final RpcHeader.ResponseHeader header;
   private final int totalResponseSize;
 
@@ -47,11 +47,12 @@ final class CallResponse {
    * read from yet, and will only be accessed by this class.
    *
    * Afterwards, this constructs the RpcHeader from the buffer.
-   * @param buf Channel buffer which call response reads from.
+   * @param buf Byte buffer which call response reads from.
    * @throws IndexOutOfBoundsException if any length prefix inside the
    * response points outside the bounds of the buffer.
    */
-  CallResponse(final ChannelBuffer buf) {
+  CallResponse(final ByteBuf buf) {
+    super(buf);
     this.buf = buf;
 
     this.totalResponseSize = buf.readableBytes();
@@ -142,7 +143,7 @@ final class CallResponse {
 
   // After checking the length, generates a slice for the next 'length'
   // bytes of 'buf'. Advances the buffer's read index by 'length' bytes.
-  private static Slice nextBytes(final ChannelBuffer buf, final int length) {
+  private static Slice nextBytes(final ByteBuf buf, final int length) {
     byte[] payload;
     int offset;
     if (buf.hasArray()) {  // Zero copy.
@@ -158,17 +159,22 @@ final class CallResponse {
   }
 
   /**
-   * Netty channel handler which receives incoming frames (ChannelBuffers)
+   * Netty decoder which receives incoming frames (ByteBuf)
    * and constructs CallResponse objects.
    */
-  static class Decoder extends OneToOneDecoder {
+  static class Decoder extends ByteToMessageDecoder {
+
+    Decoder() {
+      // Only one message is decoded on each read.
+      setSingleDecode(true);
+    }
+
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, Object 
message)
-        throws Exception {
-      if (!(message instanceof ChannelBuffer)) {
-        return message;
-      }
-      return new CallResponse((ChannelBuffer)message);
+    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> 
out) {
+      // Increase the reference count because CallResponse holds onto and uses 
the ByteBuf.
+      // https://netty.io/wiki/reference-counted-objects.html
+      msg.retain();
+      out.add(new CallResponse(msg));
     }
   }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 217b768..429e34e 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -31,8 +31,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index 2fc41ef..8a0c713 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 import org.apache.kudu.master.Master.MasterFeatures;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 69767d1..e712475 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -36,27 +36,24 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +76,7 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class Connection extends SimpleChannelUpstreamHandler {
+class Connection extends SimpleChannelInboundHandler<Object> {
   /**
    * Authentication credentials policy for negotiating outbound connections. 
Some requests
    * (e.g. {@link ConnectToMasterRequest}) behave differently depending on the 
type of credentials
@@ -105,14 +102,14 @@ class Connection extends SimpleChannelUpstreamHandler {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Timer to monitor read timeouts for the connection (used by Netty's 
ReadTimeoutHandler). */
-  private final HashedWheelTimer timer;
-
   /** Credentials policy to use when authenticating. */
   private final CredentialsPolicy credentialsPolicy;
 
+  /** The Netty client bootstrap used to configure and initialize a connected 
channel. */
+  private final Bootstrap bootstrap;
+
   /** The underlying Netty's socket channel. */
-  private final SocketChannel channel;
+  private SocketChannel channel;
 
   /**
    * Set to true when disconnect initiated explicitly from the client side. 
The channelDisconnected
@@ -167,9 +164,9 @@ class Connection extends SimpleChannelUpstreamHandler {
   @GuardedBy("lock")
   private int nextCallId = 0;
 
+  /** The future for the connection attempt. Set only once connect() is 
called. */
   @Nullable
   @GuardedBy("lock")
-  /** The future for the connection attempt. Set only once connect() is 
called. */
   private ChannelFuture connectFuture;
 
   /**
@@ -177,8 +174,7 @@ class Connection extends SimpleChannelUpstreamHandler {
    *
    * @param serverInfo the destination server
    * @param securityContext security context to use for connection negotiation
-   * @param timer timer to set up read timeout on the corresponding Netty 
channel
-   * @param channelFactory Netty factory to create corresponding Netty channel
+   * @param bootstrap Netty bootstrap to create corresponding Netty channel
    * @param credentialsPolicy policy controlling which credentials to use 
while negotiating on the
    *                          connection to the target server:
    *                          if {@link 
CredentialsPolicy#PRIMARY_CREDENTIALS}, the authentication
@@ -186,32 +182,20 @@ class Connection extends SimpleChannelUpstreamHandler {
    */
   Connection(ServerInfo serverInfo,
              SecurityContext securityContext,
-             HashedWheelTimer timer,
-             ClientSocketChannelFactory channelFactory,
+             Bootstrap bootstrap,
              CredentialsPolicy credentialsPolicy) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.state = State.NEW;
-    this.timer = timer;
     this.credentialsPolicy = credentialsPolicy;
-
-    final ConnectionPipeline pipeline = new ConnectionPipeline();
-    pipeline.init();
-
-    channel = channelFactory.newChannel(pipeline);
-    SocketChannelConfig config = channel.getConfig();
-    config.setConnectTimeoutMillis(60000);
-    config.setTcpNoDelay(true);
-    // Unfortunately there is no way to override the keep-alive timeout in
-    // Java since the JRE doesn't expose any way to call setsockopt() with
-    // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
-    config.setKeepAlive(true);
+    this.bootstrap = bootstrap.clone();
+    this.bootstrap.handler(new ConnectionChannelInitializer());
   }
 
   /** {@inheritDoc} */
   @Override
-  public void channelConnected(final ChannelHandlerContext ctx,
-                               final ChannelStateEvent e) {
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void channelActive(final ChannelHandlerContext ctx) {
     lock.lock();
     try {
       if (state == State.TERMINATED) {
@@ -222,56 +206,38 @@ class Connection extends SimpleChannelUpstreamHandler {
     } finally {
       lock.unlock();
     }
-    Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+    ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER), 
ctx.voidPromise());
     Negotiator negotiator = new 
Negotiator(serverInfo.getAndCanonicalizeHostname(), securityContext,
         (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS));
-    ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
-    negotiator.sendHello(channel);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void handleUpstream(final ChannelHandlerContext ctx,
-                             final ChannelEvent e) throws Exception {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("{} upstream event {}", getLogPrefix(), e);
-    }
-    super.handleUpstream(ctx, e);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void channelDisconnected(final ChannelHandlerContext ctx, final 
ChannelStateEvent e) {
-    // No need to call super.channelDisconnected(ctx, e) -- there should be 
nobody in the upstream
-    // pipeline after Connection itself. So, just handle the disconnection 
event ourselves.
-    cleanup(new RecoverableException(Status.NetworkError("connection 
disconnected")));
+    ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator);
+    negotiator.sendHello(ctx);
   }
 
   /** {@inheritDoc} */
   @Override
-  public void channelClosed(final ChannelHandlerContext ctx, final 
ChannelStateEvent e) {
+  public void channelInactive(final ChannelHandlerContext ctx) {
+    LOG.debug("{} handling channelInactive", getLogPrefix());
     String msg = "connection closed";
     // Connection failures are reported as channelClosed() before 
exceptionCaught() is called.
     // We can detect this case by looking at whether connectFuture has been 
marked complete
     // and grabbing the exception from there.
     lock.lock();
     try {
-      if (connectFuture != null && connectFuture.getCause() != null) {
-        msg = connectFuture.getCause().toString();
+      if (connectFuture != null && connectFuture.cause() != null) {
+        msg = connectFuture.cause().toString();
       }
     } finally {
       lock.unlock();
     }
-    // No need to call super.channelClosed(ctx, e) -- there should be nobody 
in the upstream
+    // No need to call super.channelInactive(ctx, e) -- there should be nobody 
in the upstream
     // pipeline after Connection itself. So, just handle the close event 
ourselves.
     cleanup(new RecoverableException(Status.NetworkError(msg)));
   }
 
   /** {@inheritDoc} */
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) 
throws Exception {
-    Object m = evt.getMessage();
-
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void channelRead0(ChannelHandlerContext ctx, Object m) throws 
Exception {
     // Process the results of a successful negotiation.
     if (m instanceof Negotiator.Success) {
       lock.lock();
@@ -317,7 +283,7 @@ class Connection extends SimpleChannelUpstreamHandler {
         queuedMessages = null;
 
         // Drop the negotiation timeout handler from the pipeline.
-        ctx.getPipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
+        ctx.pipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
 
         // Set the state to READY -- that means the incoming messages should 
be no longer put into
         // the queuedMessages, but sent to wire right away (see the 
enqueueMessage() for details).
@@ -343,15 +309,15 @@ class Connection extends SimpleChannelUpstreamHandler {
       } finally {
         lock.unlock();
       }
-      // Calling Channels.close() triggers the cleanup() which will handle the 
negotiation
+      // Calling close() triggers the cleanup() which will handle the 
negotiation
       // failure appropriately.
-      Channels.close(evt.getChannel());
+      ctx.close();
       return;
     }
 
     // Some other event which the connection does not handle.
     if (!(m instanceof CallResponse)) {
-      ctx.sendUpstream(evt);
+      ctx.fireChannelRead(m);
       return;
     }
 
@@ -416,10 +382,8 @@ class Connection extends SimpleChannelUpstreamHandler {
 
   /** {@inheritDoc} */
   @Override
-  public void exceptionCaught(final ChannelHandlerContext ctx, final 
ExceptionEvent event) {
-    Throwable e = event.getCause();
-    Channel c = event.getChannel();
-
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws 
Exception {
     KuduException error;
     if (e instanceof KuduException) {
       error = (KuduException) e;
@@ -462,14 +426,16 @@ class Connection extends SimpleChannelUpstreamHandler {
       // have either gotten a ClosedChannelException or an SSLException.
       assert !explicitlyDisconnected;
       String message = String.format("%s unexpected exception from downstream 
on %s",
-                                     getLogPrefix(), c);
+                                     getLogPrefix(), ctx.channel());
       error = new RecoverableException(Status.NetworkError(message), e);
       LOG.error(message, e);
     }
 
     cleanup(error);
-    if (c.isOpen()) {
-      Channels.close(c);
+    // `ctx` is null when `exceptionCaught` is called from the `connectFuture`
+    // listener in `connect()`.
+    if (ctx != null) {
+      ctx.close();
     }
   }
 
@@ -583,8 +549,20 @@ class Connection extends SimpleChannelUpstreamHandler {
    * @return future object to wait on the disconnect completion, if necessary
    */
   ChannelFuture disconnect() {
-    explicitlyDisconnected = true;
-    return Channels.disconnect(channel);
+    lock.lock();
+    try {
+      LOG.debug("{} disconnecting while in state {}", getLogPrefix(), state);
+      explicitlyDisconnected = true;
+      // No connection has been made yet.
+      if (state == State.NEW) {
+        // Use an EmbeddedChannel to return a valid and immediately completed 
ChannelFuture.
+        return new EmbeddedChannel().disconnect();
+      } else {
+        return connectFuture.channel().disconnect();
+      }
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -617,7 +595,7 @@ class Connection extends SimpleChannelUpstreamHandler {
         deferred.callback(null);
         return;
       }
-      final Throwable t = future.getCause();
+      final Throwable t = future.cause();
       if (t instanceof Exception) {
         deferred.callback(t);
       } else {
@@ -671,12 +649,13 @@ class Connection extends SimpleChannelUpstreamHandler {
    * Start sending the message to the server over the wire. It's crucial to 
not hold the lock
    * while doing so: see enqueueMessage() and KUDU-1894 for details.
    */
+  @SuppressWarnings("FutureReturnValueIgnored")
   private void sendCallToWire(final RpcOutboundMessage msg) {
     assert !lock.isHeldByCurrentThread();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{} sending {}", getLogPrefix(), msg);
     }
-    Channels.write(channel, msg);
+    channel.writeAndFlush(msg, channel.voidPromise());
   }
 
   /**
@@ -745,10 +724,24 @@ class Connection extends SimpleChannelUpstreamHandler {
   /** Initiate opening TCP connection to the server. */
   @GuardedBy("lock")
   private void connect() {
+    LOG.debug("{} connecting to peer", getLogPrefix());
     Preconditions.checkState(lock.isHeldByCurrentThread());
     Preconditions.checkState(state == State.NEW);
     state = State.CONNECTING;
-    connectFuture = channel.connect(serverInfo.getResolvedAddress());
+    connectFuture = bootstrap.connect(serverInfo.getResolvedAddress());
+    connectFuture.addListener(new GenericFutureListener<Future<? super 
Void>>() {
+      @Override
+      public void operationComplete(Future<? super Void> future) throws 
Exception {
+        if (future.isSuccess()) {
+          LOG.debug("{} Successfully connected to peer", getLogPrefix());
+          return;
+        }
+        // If the connection failed, pass the exception to exceptionCaught to 
be handled.
+        final Throwable t = future.cause();
+        exceptionCaught(null, t);
+      }
+    });
+    channel = (SocketChannel) connectFuture.channel();
   }
 
   /** Enumeration to represent the internal state of the Connection object. */
@@ -811,22 +804,23 @@ class Connection extends SimpleChannelUpstreamHandler {
     }
   }
 
-  /** The helper class to build the Netty's connection pipeline. */
-  private final class ConnectionPipeline extends DefaultChannelPipeline {
-    void init() {
-      super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
-          KuduRpc.MAX_RPC_SIZE,
-          0, // length comes at offset 0
-          4, // length prefix is 4 bytes long
-          0, // no "length adjustment"
-          4 /* strip the length prefix */));
-      super.addLast("decode-inbound", new CallResponse.Decoder());
-      super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+  private final class ConnectionChannelInitializer extends 
ChannelInitializer<Channel> {
+    @Override
+    public void initChannel(Channel ch) throws Exception {
+      ChannelPipeline pipeline = ch.pipeline();
+      pipeline.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+              KuduRpc.MAX_RPC_SIZE,
+              0, // length comes at offset 0
+              4, // length prefix is 4 bytes long
+              0, // no "length adjustment"
+              4 /* strip the length prefix */));
+      pipeline.addLast("decode-inbound", new CallResponse.Decoder());
+      pipeline.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
       // Add a socket read timeout handler to function as a timeout for 
negotiation.
       // The handler will be removed once the connection is negotiated.
-      super.addLast(NEGOTIATION_TIMEOUT_HANDLER, new ReadTimeoutHandler(
-          Connection.this.timer, NEGOTIATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS));
-      super.addLast("kudu-handler", Connection.this);
+      pipeline.addLast(NEGOTIATION_TIMEOUT_HANDLER,
+              new ReadTimeoutHandler(NEGOTIATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS));
+      pipeline.addLast("kudu-handler", Connection.this);
     }
   }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 6da15b5..83bff27 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -27,10 +27,9 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 
 /**
  * The ConnectionCache is responsible for managing connections to Kudu masters 
and tablet servers.
@@ -54,11 +53,8 @@ class ConnectionCache {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Timer to monitor read timeouts for connections (used by Netty's 
ReadTimeoutHandler) */
-  private final HashedWheelTimer timer;
-
-  /** Netty's channel factory to use by connections. */
-  private final ClientSocketChannelFactory channelFactory;
+  /** Netty's bootstrap to use by connections. */
+  private final Bootstrap bootstrap;
 
   /**
    * Container mapping server IP/port into the established connection from the 
client to the
@@ -71,11 +67,9 @@ class ConnectionCache {
 
   /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
-                  HashedWheelTimer timer,
-                  ClientSocketChannelFactory channelFactory) {
+                  Bootstrap bootstrap) {
     this.securityContext = securityContext;
-    this.timer = timer;
-    this.channelFactory = channelFactory;
+    this.bootstrap = bootstrap;
   }
 
   /**
@@ -127,8 +121,7 @@ class ConnectionCache {
       if (result == null) {
         result = new Connection(serverInfo,
                                 securityContext,
-                                timer,
-                                channelFactory,
+                                bootstrap,
                                 credentialsPolicy);
         connections.add(result);
         // There can be at most 2 connections to the same destination: one 
with primary and another
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index f5c5eb5..0885fc2 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -22,8 +22,8 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
index dd8f1fc..127ec74 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
@@ -18,8 +18,8 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index 3be2770..7542e10 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -20,8 +20,8 @@ package org.apache.kudu.client;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index acd5744..4aac027 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -28,8 +28,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.master.Master;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
index e541f39..6a0baf4 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
@@ -18,8 +18,8 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
index 2bce8a1..3a8a321 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -22,8 +22,8 @@ import static 
org.apache.kudu.master.Master.IsAlterTableDoneResponsePB;
 import static org.apache.kudu.master.Master.TableIdentifierPB;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
index 76ca70a..2eb0d7c 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -18,8 +18,8 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master.IsCreateTableDoneRequestPB;
 import org.apache.kudu.master.Master.IsCreateTableDoneResponsePB;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index fc47700..7c84373 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -513,33 +513,49 @@ public class KuduClient implements AutoCloseable {
     }
 
     /**
-     * Set the executors which will be used for the embedded Netty boss and 
workers.
+     * @deprecated the bossExecutor is no longer used and will have no effect 
if provided
+     */
+    @Deprecated
+    public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor 
workerExecutor) {
+      clientBuilder.nioExecutors(bossExecutor, workerExecutor);
+      return this;
+    }
+
+    /**
+     * Set the executor which will be used for the embedded Netty workers.
+     *
      * Optional.
-     * If not provided, uses a simple cached threadpool. If either argument is 
null,
-     * then such a thread pool will be used in place of that argument.
+     * If not provided, uses a simple cached threadpool. If workerExecutor is 
null,
+     * then such a thread pool will be used.
      * Note: executor's max thread number must be greater or equal to 
corresponding
      * worker count, or netty cannot start enough threads, and client will get 
stuck.
      * If not sure, please just use CachedThreadPool.
      */
-    public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor 
workerExecutor) {
-      clientBuilder.nioExecutors(bossExecutor, workerExecutor);
+    public KuduClientBuilder nioExecutor(Executor workerExecutor) {
+      clientBuilder.nioExecutor(workerExecutor);
       return this;
     }
 
     /**
-     * Set the maximum number of boss threads.
-     * Optional.
-     * If not provided, 1 is used.
+     * @deprecated the bossExecutor is no longer used and will have no effect 
if provided
      */
+    @Deprecated
     public KuduClientBuilder bossCount(int bossCount) {
-      clientBuilder.bossCount(bossCount);
+      LOG.info("bossCount is deprecated");
       return this;
     }
 
     /**
      * Set the maximum number of worker threads.
+     * A worker thread performs non-blocking read and write for one or more
+     * Netty Channels in a non-blocking mode.
+     *
      * Optional.
-     * If not provided, (2 * the number of available processors) is used.
+     * If not provided, (2 * the number of available processors) is used. If
+     * this client instance will be used on a machine running many client
+     * instances, it may be wise to lower this count, for example to avoid
+     * resource limits, at the possible cost of some performance of this client
+     * instance.
      */
     public KuduClientBuilder workerCount(int workerCount) {
       clientBuilder.workerCount(workerCount);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 56a626a..a224c65 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -39,12 +39,12 @@ import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import com.stumbleupon.async.Deferred;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -426,26 +426,22 @@ public abstract class KuduRpc<R> {
   }
 
   // TODO(todd): make this private and have all RPCs send RpcOutboundMessage
-  // instances instead of ChannelBuffers
-  static ChannelBuffer toChannelBuffer(Message header, Message pb) {
+  //  instances instead of ByteBuf
+  static void toByteBuf(ByteBuf out, Message header, Message pb) {
     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, pb);
-    byte[] buf = new byte[totalSize + 4];
-    ChannelBuffer chanBuf = ChannelBuffers.wrappedBuffer(buf);
-    chanBuf.clear();
-    chanBuf.writeInt(totalSize);
-    final CodedOutputStream out = CodedOutputStream.newInstance(buf, 4, 
totalSize);
-    try {
-      out.writeUInt32NoTag(header.getSerializedSize());
-      header.writeTo(out);
-
-      out.writeUInt32NoTag(pb.getSerializedSize());
-      pb.writeTo(out);
-      out.checkNoSpaceLeft();
+    out.capacity(totalSize + 4);
+    out.writeInt(totalSize);
+    try (ByteBufOutputStream bos = new ByteBufOutputStream(out)) {
+      CodedOutputStream cos = CodedOutputStream.newInstance(bos, totalSize);
+      cos.writeUInt32NoTag(header.getSerializedSize());
+      header.writeTo(cos);
+
+      cos.writeUInt32NoTag(pb.getSerializedSize());
+      pb.writeTo(cos);
+      cos.flush();
     } catch (IOException e) {
       throw new RuntimeException("Cannot serialize the following message " + 
pb);
     }
-    chanBuf.writerIndex(buf.length);
-    return chanBuf;
   }
 
   /**
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index 9c7161f..a462ba1 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.client.ListTablesResponse.TableInfo;
 import org.apache.kudu.master.Master;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
index 92b1ebb..9451c79 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
@@ -24,8 +24,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.util.Pair;
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
index 277837a..7010bf3 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.tserver.Tserver;
 import org.apache.kudu.util.Pair;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 84a982d..75b1489 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,10 +27,13 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.security.cert.Certificate;
@@ -60,18 +63,17 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.ietf.jgss.GSSException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
-import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +91,7 @@ import org.apache.kudu.util.SecurityUtil;
  * from the pipeline and fires a Negotiator.Success or Negotiator.Failure 
upstream.
  */
 @InterfaceAudience.Private
-public class Negotiator extends SimpleChannelUpstreamHandler {
+public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
   private static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
 
   private final SaslClientCallbackHandler saslCallback = new 
SaslClientCallbackHandler();
@@ -190,7 +192,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
    * and add the handler directly to the real ChannelPipeline.
    * Only non-null once TLS is initiated.
    */
-  private DecoderEmbedder<ChannelBuffer> sslEmbedder;
+  private EmbeddedChannel sslEmbedder;
 
   /**
    * The nonce sent from the server to the client, or null if negotiation has
@@ -202,7 +204,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
    * Future indicating whether the embedded handshake has completed.
    * Only non-null once TLS is initiated.
    */
-  private ChannelFuture sslHandshakeFuture;
+  private Future<Channel> sslHandshakeFuture;
 
   private Certificate peerCert;
 
@@ -231,11 +233,11 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     }
   }
 
-  public void sendHello(Channel channel) {
-    sendNegotiateMessage(channel);
+  public void sendHello(ChannelHandlerContext ctx) {
+    sendNegotiateMessage(ctx);
   }
 
-  private void sendNegotiateMessage(Channel channel) {
+  private void sendNegotiateMessage(ChannelHandlerContext ctx) {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
         .setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
 
@@ -244,7 +246,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
       builder.addSupportedFeatures(flag);
     }
-    if (isLoopbackConnection(channel)) {
+    if (isLoopbackConnection(ctx.channel())) {
       builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY);
     }
 
@@ -265,57 +267,48 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     // Java client.
 
     state = State.AWAIT_NEGOTIATE;
-    sendSaslMessage(channel, builder.build());
+    sendSaslMessage(ctx, builder.build());
   }
 
-  private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
-    Preconditions.checkNotNull(channel);
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void sendSaslMessage(ChannelHandlerContext ctx, 
RpcHeader.NegotiatePB msg) {
     RpcHeader.RequestHeader.Builder builder = 
RpcHeader.RequestHeader.newBuilder();
     builder.setCallId(SASL_CALL_ID);
-    Channels.write(channel, new RpcOutboundMessage(builder, msg));
+    ctx.writeAndFlush(new RpcOutboundMessage(builder, msg), ctx.voidPromise());
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) 
throws IOException {
-    Object m = evt.getMessage();
-    if (!(m instanceof CallResponse)) {
-      ctx.sendUpstream(evt);
-      return;
-    }
-    handleResponse(ctx.getChannel(), (CallResponse)m);
-  }
-
-  private void handleResponse(Channel chan, CallResponse callResponse) throws 
IOException {
-    final RpcHeader.ResponseHeader header = callResponse.getHeader();
+  public void channelRead0(ChannelHandlerContext ctx, CallResponse msg) throws 
IOException {
+    final RpcHeader.ResponseHeader header = msg.getHeader();
     if (header.getIsError()) {
       final RpcHeader.ErrorStatusPB.Builder errBuilder = 
RpcHeader.ErrorStatusPB.newBuilder();
-      KuduRpc.readProtobuf(callResponse.getPBMessage(), errBuilder);
+      KuduRpc.readProtobuf(msg.getPBMessage(), errBuilder);
       final RpcHeader.ErrorStatusPB error = errBuilder.build();
       LOG.debug("peer {} sent connection negotiation error: {}",
-          chan.getRemoteAddress(), error.getMessage());
+          ctx.channel().remoteAddress(), error.getMessage());
 
       // The upstream code should handle the negotiation failure.
       state = State.FINISHED;
-      chan.getPipeline().remove(this);
-      Channels.fireMessageReceived(chan, new Failure(error));
+      ctx.pipeline().remove(this);
+      ctx.fireChannelRead(new Failure(error));
       return;
     }
 
-    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+    RpcHeader.NegotiatePB response = parseSaslMsgResponse(msg);
     // TODO: check that the message type matches the expected one in all
     // of the below implementations.
     switch (state) {
       case AWAIT_NEGOTIATE:
-        handleNegotiateResponse(chan, response);
+        handleNegotiateResponse(ctx, response);
         break;
       case AWAIT_SASL:
-        handleSaslMessage(chan, response);
+        handleSaslMessage(ctx, response);
         break;
       case AWAIT_TOKEN_EXCHANGE:
-        handleTokenExchangeResponse(chan, response);
+        handleTokenExchangeResponse(ctx, response);
         break;
       case AWAIT_TLS_HANDSHAKE:
-        handleTlsMessage(chan, response);
+        handleTlsMessage(ctx, response);
         break;
       default:
         throw new IllegalStateException("received a message in unexpected 
state: " +
@@ -323,13 +316,14 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     }
   }
 
-  private void handleSaslMessage(Channel chan, NegotiatePB response) throws 
IOException {
+  private void handleSaslMessage(ChannelHandlerContext ctx, NegotiatePB 
response)
+          throws IOException {
     switch (response.getStep()) {
       case SASL_CHALLENGE:
-        handleChallengeResponse(chan, response);
+        handleChallengeResponse(ctx, response);
         break;
       case SASL_SUCCESS:
-        handleSuccessResponse(chan, response);
+        handleSuccessResponse(ctx, response);
         break;
       default:
         throw new IllegalStateException("Wrong negotiation step: " +
@@ -350,7 +344,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     return saslBuilder.build();
   }
 
-  private void handleNegotiateResponse(Channel chan,
+  private void handleNegotiateResponse(ChannelHandlerContext ctx,
                                        RpcHeader.NegotiatePB response) throws 
IOException {
     Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
         "Expected NEGOTIATE message, got {}", response.getStep());
@@ -370,9 +364,9 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     // If we negotiated TLS, then we want to start the TLS handshake; 
otherwise,
     // we can move directly to the authentication phase.
     if (negotiatedTls) {
-      startTlsHandshake(chan);
+      startTlsHandshake(ctx);
     } else {
-      startAuthentication(chan);
+      startAuthentication(ctx);
     }
   }
 
@@ -385,8 +379,8 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
       return true;
     }
     try {
-      InetAddress local = 
((InetSocketAddress)channel.getLocalAddress()).getAddress();
-      InetAddress remote = 
((InetSocketAddress)channel.getRemoteAddress()).getAddress();
+      InetAddress local = 
((InetSocketAddress)channel.localAddress()).getAddress();
+      InetAddress remote = 
((InetSocketAddress)channel.remoteAddress()).getAddress();
       return local.equals(remote);
     } catch (ClassCastException cce) {
       // In the off chance that we have some other type of local/remote 
address,
@@ -538,7 +532,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
   /**
    * Send the initial TLS "ClientHello" message.
    */
-  private void startTlsHandshake(Channel chan) throws SSLException {
+  private void startTlsHandshake(ChannelHandlerContext ctx) throws 
SSLException {
     SSLEngine engine;
     switch (chosenAuthnType) {
       case SASL:
@@ -566,12 +560,12 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
           "Supported suites: " + Joiner.on(',').join(supported));
     }
     engine.setEnabledCipherSuites(toEnable.toArray(new String[0]));
-    SslHandler handler = new SslHandler(engine);
-    handler.setEnableRenegotiation(false);
-    sslEmbedder = new DecoderEmbedder<>(handler);
-    sslHandshakeFuture = handler.handshake();
+    SharableSslHandler handler = new SharableSslHandler(engine);
+
+    sslEmbedder = new EmbeddedChannel(handler);
+    sslHandshakeFuture = handler.handshakeFuture();
     state = State.AWAIT_TLS_HANDSHAKE;
-    boolean sent = sendPendingOutboundTls(chan);
+    boolean sent = sendPendingOutboundTls(ctx);
     assert sent;
   }
 
@@ -579,15 +573,17 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
    * Handle an inbound message during the TLS handshake. If this message
    * causes the handshake to complete, triggers the beginning of SASL 
initiation.
    */
-  private void handleTlsMessage(Channel chan, NegotiatePB response) throws 
IOException {
+  private void handleTlsMessage(ChannelHandlerContext ctx, NegotiatePB 
response)
+          throws IOException {
     Preconditions.checkState(response.getStep() == 
NegotiateStep.TLS_HANDSHAKE);
     Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
         "empty TLS message from server");
 
     // Pass the TLS message into our embedded SslHandler.
-    sslEmbedder.offer(ChannelBuffers.copiedBuffer(
+    sslEmbedder.writeInbound(Unpooled.copiedBuffer(
         response.getTlsHandshake().asReadOnlyByteBuffer()));
-    if (sendPendingOutboundTls(chan)) {
+    sslEmbedder.flush();
+    if (sendPendingOutboundTls(ctx)) {
       // Data was sent -- we must continue the handshake process.
       return;
     }
@@ -598,8 +594,9 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     //
     // NOTE: this takes effect immediately (i.e. the following SASL initiation
     // sequence is encrypted).
-    SslHandler handler = (SslHandler)sslEmbedder.getPipeline().getFirst();
-    Certificate[] certs = 
handler.getEngine().getSession().getPeerCertificates();
+    SharableSslHandler handler = (SharableSslHandler) 
sslEmbedder.pipeline().first();
+    handler.resetAdded();
+    Certificate[] certs = handler.engine().getSession().getPeerCertificates();
     if (certs.length == 0) {
       throw new SSLPeerUnverifiedException("no peer cert found");
     }
@@ -609,11 +606,11 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
 
     // Don't wrap the TLS socket if we are using TLS for authentication only.
     boolean isAuthOnly = 
serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
-        isLoopbackConnection(chan);
+        isLoopbackConnection(ctx.channel());
     if (!isAuthOnly) {
-      chan.getPipeline().addFirst("tls", handler);
+      ctx.pipeline().addFirst("tls", handler);
     }
-    startAuthentication(chan);
+    startAuthentication(ctx);
   }
 
   /**
@@ -622,13 +619,17 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
    *
    * Otherwise, indicates that the handshake is complete by returning false.
    */
-  private boolean sendPendingOutboundTls(Channel chan) {
+  private boolean sendPendingOutboundTls(ChannelHandlerContext ctx) {
     // The SslHandler can generate multiple TLS messages in response
     // (e.g. ClientKeyExchange, ChangeCipherSpec, ClientFinished).
     // We poll the handler until it stops giving us buffers.
     List<ByteString> bufs = Lists.newArrayList();
-    while (sslEmbedder.peek() != null) {
-      bufs.add(ByteString.copyFrom(sslEmbedder.poll().toByteBuffer()));
+    while (!sslEmbedder.outboundMessages().isEmpty()) {
+      ByteBuf msg = sslEmbedder.readOutbound();
+      bufs.add(ByteString.copyFrom(msg.nioBuffer()));
+      // Release the reference counted ByteBuf to avoid leaks now that we are 
done with it.
+      // https://netty.io/wiki/reference-counted-objects.html
+      msg.release();
     }
     ByteString data = ByteString.copyFrom(bufs);
     if (sslHandshakeFuture.isDone()) {
@@ -640,7 +641,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
       return false;
     } else {
       assert data.size() > 0;
-      sendTunneledTls(chan, data);
+      sendTunneledTls(ctx, data);
       return true;
     }
   }
@@ -649,27 +650,28 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
    * Send a buffer of data for the TLS handshake, encapsulated in the
    * appropriate TLS_HANDSHAKE negotiation message.
    */
-  private void sendTunneledTls(Channel chan, ByteString buf) {
-    sendSaslMessage(chan, NegotiatePB.newBuilder()
+  private void sendTunneledTls(ChannelHandlerContext ctx, ByteString buf) {
+    sendSaslMessage(ctx, NegotiatePB.newBuilder()
         .setStep(NegotiateStep.TLS_HANDSHAKE)
         .setTlsHandshake(buf)
         .build());
   }
 
-  private void startAuthentication(Channel chan) throws SaslException, 
NonRecoverableException {
+  private void startAuthentication(ChannelHandlerContext ctx)
+          throws SaslException, NonRecoverableException {
     switch (chosenAuthnType) {
       case SASL:
-        sendSaslInitiate(chan);
+        sendSaslInitiate(ctx);
         break;
       case TOKEN:
-        sendTokenExchange(chan);
+        sendTokenExchange(ctx);
         break;
       default:
         throw new AssertionError("unreachable");
     }
   }
 
-  private void sendTokenExchange(Channel chan) {
+  private void sendTokenExchange(ChannelHandlerContext ctx) {
     // We must not send a token unless we have successfully finished
     // authenticating via TLS.
     Preconditions.checkNotNull(authnToken);
@@ -680,19 +682,20 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
         .setStep(NegotiateStep.TOKEN_EXCHANGE)
         .setAuthnToken(authnToken);
     state = State.AWAIT_TOKEN_EXCHANGE;
-    sendSaslMessage(chan, builder.build());
+    sendSaslMessage(ctx, builder.build());
   }
 
-  private void handleTokenExchangeResponse(Channel chan, NegotiatePB response)
+  private void handleTokenExchangeResponse(ChannelHandlerContext ctx, 
NegotiatePB response)
       throws SaslException {
     Preconditions.checkArgument(response.getStep() == 
NegotiateStep.TOKEN_EXCHANGE,
         "expected TOKEN_EXCHANGE, got step: {}", response.getStep());
 
     // The token response doesn't have any actual data in it, so we can just 
move on.
-    finish(chan);
+    finish(ctx);
   }
 
-  private void sendSaslInitiate(Channel chan) throws SaslException, 
NonRecoverableException {
+  private void sendSaslInitiate(ChannelHandlerContext ctx)
+          throws SaslException, NonRecoverableException {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
     if (saslClient.hasInitialResponse()) {
       byte[] initialResponse = evaluateChallenge(new byte[0]);
@@ -701,10 +704,10 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
     builder.addSaslMechanismsBuilder().setMechanism(chosenMech.name());
     state = State.AWAIT_SASL;
-    sendSaslMessage(chan, builder.build());
+    sendSaslMessage(ctx, builder.build());
   }
 
-  private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB 
response)
+  private void handleChallengeResponse(ChannelHandlerContext ctx, 
RpcHeader.NegotiatePB response)
       throws SaslException, NonRecoverableException {
     byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
     if (saslToken == null) {
@@ -713,7 +716,7 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
     builder.setToken(UnsafeByteOperations.unsafeWrap(saslToken));
     builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_RESPONSE);
-    sendSaslMessage(chan, builder.build());
+    sendSaslMessage(ctx, builder.build());
   }
 
   /**
@@ -739,9 +742,10 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
     }
   }
 
-  private void handleSuccessResponse(Channel chan, NegotiatePB response) 
throws IOException {
+  private void handleSuccessResponse(ChannelHandlerContext ctx, NegotiatePB 
response)
+          throws IOException {
     Preconditions.checkState(saslClient.isComplete(),
-                             "server sent SASL_SUCCESS step, but SASL 
negotiation is not complete");
+            "server sent SASL_SUCCESS step, but SASL negotiation is not 
complete");
     if (chosenMech == SaslMechanism.GSSAPI) {
       if (response.hasNonce()) {
         // Grab the nonce from the server, if it has sent one. We'll send it 
back
@@ -755,21 +759,22 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
       }
     }
 
-    finish(chan);
+    finish(ctx);
   }
 
   /**
    * Marks the negotiation as finished, and sends the connection context to 
the server.
-   * @param chan the connection channel
+   * @param ctx the connection context
    */
-  private void finish(Channel chan) throws SaslException {
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void finish(ChannelHandlerContext ctx) throws SaslException {
     state = State.FINISHED;
-    chan.getPipeline().remove(this);
+    ctx.pipeline().remove(this);
 
-    Channels.write(chan, makeConnectionContext());
+    ctx.writeAndFlush(makeConnectionContext(), ctx.voidPromise());
     LOG.debug("Authenticated connection {} using {}/{}",
-        chan, chosenAuthnType, chosenMech);
-    Channels.fireMessageReceived(chan, new Success(serverFeatures));
+            ctx.channel(), chosenAuthnType, chosenMech);
+    ctx.fireChannelRead(new Success(serverFeatures));
   }
 
   private RpcOutboundMessage makeConnectionContext() throws SaslException {
@@ -871,4 +876,39 @@ public class Negotiator extends 
SimpleChannelUpstreamHandler {
       this.status = status;
     }
   }
+
+  /**
+   * A hack to allow sharing the SslHandler even though it's not annotated as 
"Sharable".
+   * We aren't technically sharing it, but when we move it from the 
EmbeddedChannel to
+   * the actual channel above the sharing validation runs and throws an 
exception.
+   *
+   * 
https://netty.io/wiki/new-and-noteworthy-in-4.0.html#well-defined-thread-model
+   * https://netty.io/4.0/api/io/netty/channel/ChannelHandler.Sharable.html
+   *
+   * TODO (ghenke): Remove the need for this reflection.
+   */
+  static class SharableSslHandler extends SslHandler {
+
+    public SharableSslHandler(SSLEngine engine) {
+      super(engine);
+    }
+
+    void resetAdded() {
+      Field addedField = 
AccessController.doPrivileged((PrivilegedAction<Field>) () -> {
+        try {
+          Class<?> c = ChannelHandlerAdapter.class;
+          Field added = c.getDeclaredField("added");
+          added.setAccessible(true);
+          return added;
+        } catch (NoSuchFieldException e) {
+          throw new RuntimeException(e);
+        }
+      });
+      try {
+        addedField.setBoolean(this, false);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index ec6b768..531c776 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -29,9 +29,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index 877a571..cbb8e3c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -18,9 +18,9 @@
 package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
index ab1ce40..9fcd221 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
@@ -19,9 +19,9 @@ package org.apache.kudu.client;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.TextFormat;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,21 +61,15 @@ class RpcOutboundMessage {
   /**
    * Netty encoder implementation to serialize outbound messages.
    */
-  static class Encoder extends OneToOneEncoder {
+  static class Encoder extends MessageToByteEncoder<RpcOutboundMessage> {
+
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel chan,
-        Object obj) throws Exception {
-      if (!(obj instanceof RpcOutboundMessage)) {
-        return obj;
-      }
-      RpcOutboundMessage msg = (RpcOutboundMessage)obj;
+    protected void encode(ChannelHandlerContext ctx, RpcOutboundMessage msg, 
ByteBuf out) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("{}: sending RPC {}", chan, msg);
+        LOG.trace("{}: sending RPC {}", ctx.channel(), msg);
       }
-      // TODO(todd): move this impl into this class and remove external
-      // callers.
-      return KuduRpc.toChannelBuffer(msg.getHeaderBuilder().build(),
-          msg.getBody());
+      // TODO(todd): move this impl into this class and remove external 
callers.
+      KuduRpc.toByteBuf(out, msg.getHeaderBuilder().build(), msg.getBody());
     }
   }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
index 3a82e56..b167736 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
@@ -22,8 +22,8 @@ import java.util.List;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Common.KeyRangePB;
 import org.apache.kudu.security.Token;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java 
b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
index dd54a4e..2da927a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Functions;
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 5cc72e8..5e5943f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -189,6 +189,7 @@ public class ITClient {
      * @return true if successfully completed or didn't find a server to 
disconnect, false it it
      * encountered a failure
      */
+    @SuppressWarnings("FutureReturnValueIgnored")
     private boolean disconnectNode() {
       try {
         final List<Connection> connections = 
harness.getAsyncClient().getConnectionListCopy();
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 76d9471..c3c2d2d 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -160,6 +160,7 @@ public class ITScannerMultiTablet {
    * @param isFaultTolerant if true use fault-tolerant scanner, otherwise use 
non-fault-tolerant one
    * @throws Exception
    */
+  @SuppressWarnings("FutureReturnValueIgnored")
   void clientFaultInjection(boolean isFaultTolerant) throws KuduException {
     KuduScanner scanner = harness.getClient().newScannerBuilder(table)
         .setFaultTolerant(isFaultTolerant)
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 17d3a15..a7fdb9b 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -111,6 +111,7 @@ public class TestAsyncKuduClient {
     assertEquals(rowCount - numRows, countRowsInScan(scanner));
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   private void disconnectAndWait() throws InterruptedException {
     for (Connection c : asyncClient.getConnectionListCopy()) {
       c.disconnect();
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
index 24aef14..82b2035 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
@@ -84,6 +84,7 @@ public class TestAuthTokenReacquire {
     asyncClient = harness.getAsyncClient();
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   private void dropConnections() {
     for (Connection c : asyncClient.getConnectionListCopy()) {
       c.disconnect();
@@ -172,7 +173,7 @@ public class TestAuthTokenReacquire {
     }
     if (!exceptions.isEmpty()) {
       for (Map.Entry<Integer, Throwable> e : exceptions.entrySet()) {
-        LOG.error("exception in thread {}: {}", e.getKey(), e.getValue());
+        LOG.error(String.format("exception in thread %s:", e.getKey()), 
e.getValue());
       }
       fail("test failed: unexpected errors");
     }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
index dc98a65..f7d23b4 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
@@ -78,6 +78,7 @@ public class TestAuthnTokenReacquireOpen {
     asyncClient = harness.getAsyncClient();
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   private void dropConnections() {
     for (Connection c : asyncClient.getConnectionListCopy()) {
       c.disconnect();
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index 492b5a3..4b1d4a3 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -17,13 +17,13 @@
 
 package org.apache.kudu.client;
 
-import java.util.List;
-
 import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
 import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.util.List;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Callback;
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index f202435..19ebfc3 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -37,6 +37,7 @@ public class TestConnectionCache {
   public RetryRule retryRule = new RetryRule();
 
   @Test(timeout = 50000)
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void test() throws Exception {
     try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
                                                       .numMasterServers(3)
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index c60578d..d73bb44 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -37,11 +37,14 @@ import javax.net.ssl.SSLException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.Message;
 import com.google.protobuf.TextFormat;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
-import org.jboss.netty.handler.ssl.SslHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.ssl.SslHandler;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -49,8 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.client.Negotiator.Success;
+import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
-import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
@@ -63,7 +66,7 @@ import org.apache.kudu.util.SecurityUtil;
 public class TestNegotiator {
   static final Logger LOG = LoggerFactory.getLogger(TestNegotiator.class);
 
-  private DecoderEmbedder<Object> embedder;
+  private EmbeddedChannel embedder;
   private SecurityContext secContext;
   private SSLEngine serverEngine;
 
@@ -107,19 +110,21 @@ public class TestNegotiator {
   private void startNegotiation(boolean fakeLoopback) {
     Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false);
     negotiator.overrideLoopbackForTests = fakeLoopback;
-    embedder = new DecoderEmbedder<>(negotiator);
-    negotiator.sendHello(embedder.getPipeline().getChannel());
+    embedder = new EmbeddedChannel(negotiator);
+    negotiator.sendHello(embedder.pipeline().firstContext());
   }
 
   static CallResponse fakeResponse(ResponseHeader header, Message body) {
-    ChannelBuffer buf = KuduRpc.toChannelBuffer(header, body);
+    ByteBuf buf = Unpooled.buffer();
+    KuduRpc.toByteBuf(buf, header, body);
     buf = buf.slice(4, buf.readableBytes() - 4);
     return new CallResponse(buf);
   }
 
   KeyStore loadTestKeystore() throws Exception {
     KeyStore ks = KeyStore.getInstance("JKS");
-    try (InputStream stream = 
TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks")) {
+    try (InputStream stream =
+                 
TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks")) {
       ks.load(stream, KEYSTORE_PASSWORD);
     }
     return ks;
@@ -142,14 +147,17 @@ public class TestNegotiator {
    * a Negotiation.Success to the pipeline.
    * @return the result
    */
-  private Success assertComplete() {
-    RpcOutboundMessage msg = (RpcOutboundMessage)embedder.poll();
-    ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
+  private Success assertComplete(boolean isTls) throws Exception {
+    RpcOutboundMessage msg = isTls ?
+            unwrapOutboundMessage(embedder.readOutbound(),
+                    RpcHeader.ConnectionContextPB.newBuilder()) :
+            embedder.readOutbound();
+    RpcHeader.ConnectionContextPB connCtx = (RpcHeader.ConnectionContextPB) 
msg.getBody();
     assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, 
msg.getHeaderBuilder().getCallId());
     assertEquals(System.getProperty("user.name"), 
connCtx.getDEPRECATEDUserInfo().getRealUser());
 
     // Expect the client to also emit a negotiation Success.
-    Success success = (Success)embedder.poll();
+    Success success = embedder.readInbound();
     assertNotNull(success);
     return success;
   }
@@ -166,25 +174,26 @@ public class TestNegotiator {
    * Simple test case for a PLAIN negotiation.
    */
   @Test
-  public void testNegotiation() {
+  public void testNegotiation() throws Exception {
     startNegotiation(false);
 
     // Expect client->server: NEGOTIATE.
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals(Negotiator.SASL_CALL_ID, msg.getHeaderBuilder().getCallId());
     assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
 
     // Respond with NEGOTIATE.
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           .addSaslMechanisms(SaslMechanism.newBuilder().setMechanism("PLAIN"))
           .setStep(NegotiateStep.NEGOTIATE)
           .build()));
+    embedder.flushInbound();
 
     // Expect client->server: SASL_INITIATE (PLAIN)
-    msg = (RpcOutboundMessage)embedder.poll();
+    msg = embedder.readOutbound();
     body = (NegotiatePB) msg.getBody();
 
     assertEquals(Negotiator.SASL_CALL_ID, msg.getHeaderBuilder().getCallId());
@@ -194,14 +203,15 @@ public class TestNegotiator {
     assertTrue(body.hasToken());
 
     // Respond with SASL_SUCCESS:
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           .setStep(NegotiateStep.SASL_SUCCESS)
           .build()));
+    embedder.flushInbound();
 
     // Expect client->server: ConnectionContext
-    assertComplete();
+    assertComplete(/*isTls*/ false);
   }
 
   private static void runTasks(SSLEngineResult result,
@@ -253,21 +263,30 @@ public class TestNegotiator {
    * Completes the 3-step TLS handshake, assuming that the client is
    * about to generate the first of the messages.
    */
-  private void runTlsHandshake() throws SSLException {
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+  private void runTlsHandshake(boolean isAuthOnly) throws SSLException {
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
 
     // Consume the ClientHello in our fake server, which should generate 
ServerHello.
-    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+    embedder.writeInbound(runServerStep(serverEngine, body.getTlsHandshake()));
+    embedder.flushInbound();
 
     // Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
-    msg = (RpcOutboundMessage) embedder.poll();
+    msg = embedder.readOutbound();
     body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
 
+    // Now that the handshake is complete, we need to encode 
RpcOutboundMessages
+    // to ByteBuf to be accepted by the the SslHandler.
+    // This encoder is added to the pipeline by the Connection in normal 
Negotiator usage.
+    if (!isAuthOnly) {
+      embedder.pipeline().addFirst("encode-outbound", new 
RpcOutboundMessage.Encoder());
+    }
+
     // Server consumes the above. Should send the TLS "Finished" message.
-    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+    embedder.writeInbound(runServerStep(serverEngine, body.getTlsHandshake()));
+    embedder.flushInbound();
   }
 
   @Test
@@ -275,32 +294,29 @@ public class TestNegotiator {
     startNegotiation(false);
 
     // Expect client->server: NEGOTIATE, TLS included.
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
     assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
 
     // Fake a server response with TLS enabled.
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           
.addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
           .addSupportedFeatures(RpcFeatureFlag.TLS)
           .setStep(NegotiateStep.NEGOTIATE)
           .build()));
+    embedder.flushInbound();
 
     // Expect client->server: TLS_HANDSHAKE.
-    runTlsHandshake();
+    runTlsHandshake(/*isAuthOnly*/ false);
 
     // The pipeline should now have an SSL handler as the first handler.
-    assertTrue(embedder.getPipeline().getFirst() instanceof SslHandler);
+    assertTrue(embedder.pipeline().first() instanceof SslHandler);
 
     // The Negotiator should have sent the SASL_INITIATE at this point.
-    // NOTE: in a non-mock environment, this message would now be encrypted
-    // by the newly-added TLS handler. But, with the DecoderEmbedder that we're
-    // using, we don't actually end up processing outbound events. Upgrading
-    // to Netty 4 and using EmbeddedChannel instead would make this more 
realistic.
-    msg = (RpcOutboundMessage) embedder.poll();
+    msg = unwrapOutboundMessage(embedder.readOutbound(), 
RpcHeader.NegotiatePB.newBuilder());
     body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
   }
@@ -310,7 +326,7 @@ public class TestNegotiator {
     startNegotiation(true);
 
     // Expect client->server: NEGOTIATE, TLS and TLS_AUTHENTICATION_ONLY 
included.
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
     assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
@@ -318,7 +334,7 @@ public class TestNegotiator {
         RpcFeatureFlag.TLS_AUTHENTICATION_ONLY));
 
     // Fake a server response with TLS and TLS_AUTHENTICATION_ONLY enabled.
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           
.addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
@@ -326,16 +342,17 @@ public class TestNegotiator {
           .addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY)
           .setStep(NegotiateStep.NEGOTIATE)
           .build()));
+    embedder.flushInbound();
 
     // Expect client->server: TLS_HANDSHAKE.
-    runTlsHandshake();
+    runTlsHandshake(/*isAuthOnly*/ true);
 
     // The pipeline should *not* have an SSL handler as the first handler,
     // since we used TLS for authentication only.
-    assertFalse(embedder.getPipeline().getFirst() instanceof SslHandler);
+    assertFalse(embedder.pipeline().first() instanceof SslHandler);
 
     // The Negotiator should have sent the SASL_INITIATE at this point.
-    msg = (RpcOutboundMessage) embedder.poll();
+    msg = embedder.readOutbound();
     body = (NegotiatePB) msg.getBody();
     assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
   }
@@ -350,7 +367,7 @@ public class TestNegotiator {
     startNegotiation(false);
 
     // Expect client->server: NEGOTIATE, TLS included, Token not included.
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
         "supported_features: TLS " +
@@ -369,7 +386,7 @@ public class TestNegotiator {
     startNegotiation(false);
 
     // Expect client->server: NEGOTIATE, TLS included, Token included.
-    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    RpcOutboundMessage msg = embedder.readOutbound();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
         "supported_features: TLS " +
@@ -378,7 +395,7 @@ public class TestNegotiator {
         "authn_types { token { } }", TextFormat.shortDebugString(body));
 
     // Fake a server response with TLS enabled and TOKEN chosen.
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
           .addSupportedFeatures(RpcFeatureFlag.TLS)
@@ -386,24 +403,59 @@ public class TestNegotiator {
               AuthenticationTypePB.Token.getDefaultInstance()))
           .setStep(NegotiateStep.NEGOTIATE)
           .build()));
+    embedder.flushInbound();
 
     // Expect to now run the TLS handshake
-    runTlsHandshake();
+    runTlsHandshake(/*isAuthOnly*/ false);
 
     // Expect the client to send the token.
-    msg = (RpcOutboundMessage) embedder.poll();
+    msg = unwrapOutboundMessage(embedder.readOutbound(), 
RpcHeader.NegotiatePB.newBuilder());
     body = (NegotiatePB) msg.getBody();
     assertEquals("step: TOKEN_EXCHANGE authn_token { }",
         TextFormat.shortDebugString(body));
 
     // Fake a response indicating success.
-    embedder.offer(fakeResponse(
+    embedder.writeInbound(fakeResponse(
         ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
         NegotiatePB.newBuilder()
         .setStep(NegotiateStep.TOKEN_EXCHANGE)
           .build()));
+    embedder.flushInbound();
+
+    // TODO (ghenke): For some reason the SslHandler adds an extra empty 
message here.
+    // This should be harmless, but it would be good to understand or fix why.
+    ByteBuf empty = embedder.readOutbound();
+    assertEquals(0, empty.readableBytes());
 
     // Should be complete now.
-    assertComplete();
+    assertComplete(/*isTls*/ true);
+  }
+
+  private RpcOutboundMessage unwrapOutboundMessage(ByteBuf wrappedBuf,
+                                                   Message.Builder 
requestBuilder)
+          throws Exception {
+    // Create an SSL handle to handle unwrapping the ssl message.
+    SslHandler handler = new SslHandler(serverEngine);
+    EmbeddedChannel serverSSLChannel = new EmbeddedChannel(handler);
+
+    // Pass the ssl message through the channel with the ssl handler.
+    serverSSLChannel.writeInbound(wrappedBuf);
+    serverSSLChannel.flushInbound();
+    ByteBuf unwrappedbuf = serverSSLChannel.readInbound();
+
+    // Read the message size and bytes.
+    final int size = unwrappedbuf.readInt();
+    final byte [] bytes = new byte[size];
+    unwrappedbuf.getBytes(unwrappedbuf.readerIndex(), bytes);
+
+    // Parse the message header.
+    final CodedInputStream in = CodedInputStream.newInstance(bytes);
+    RpcHeader.RequestHeader.Builder header = 
RpcHeader.RequestHeader.newBuilder();
+    in.readMessage(header, ExtensionRegistry.getEmptyRegistry());
+
+    // Parse the request message.
+    in.readMessage(requestBuilder, ExtensionRegistry.getEmptyRegistry());
+
+    return new RpcOutboundMessage(header, requestBuilder.build());
   }
 }

Reply via email to